Compare commits
No commits in common. "795010e4957e7c4a5c36416197142a9ebfb5dbe2" and "b0aa6434aea98056ac76f2d7b5ed36b819c4c8c6" have entirely different histories.
795010e495
...
b0aa6434ae
@ -6,7 +6,7 @@ import icon from 'astro-icon';
|
|||||||
// https://astro.build/config
|
// https://astro.build/config
|
||||||
export default defineConfig({
|
export default defineConfig({
|
||||||
// Site URL for sitemap generation
|
// Site URL for sitemap generation
|
||||||
site: 'https://mcbluetooth.warehack.ing',
|
site: 'https://mcbluetooth.supported.systems',
|
||||||
|
|
||||||
// Disable telemetry
|
// Disable telemetry
|
||||||
telemetry: false,
|
telemetry: false,
|
||||||
@ -25,7 +25,7 @@ export default defineConfig({
|
|||||||
replacesTitle: false,
|
replacesTitle: false,
|
||||||
},
|
},
|
||||||
social: [
|
social: [
|
||||||
{ icon: 'github', label: 'Gitea', href: 'https://git.supported.systems/warehack.ing/mcbluetooth' },
|
{ icon: 'github', label: 'GitHub', href: 'https://github.com/yourusername/mcbluetooth' },
|
||||||
],
|
],
|
||||||
customCss: [
|
customCss: [
|
||||||
'./src/styles/custom.css',
|
'./src/styles/custom.css',
|
||||||
@ -80,7 +80,7 @@ export default defineConfig({
|
|||||||
},
|
},
|
||||||
],
|
],
|
||||||
editLink: {
|
editLink: {
|
||||||
baseUrl: 'https://git.supported.systems/warehack.ing/mcbluetooth/_edit/main/docs-site/',
|
baseUrl: 'https://github.com/yourusername/mcbluetooth/edit/main/docs-site/',
|
||||||
},
|
},
|
||||||
}),
|
}),
|
||||||
],
|
],
|
||||||
|
|||||||
@ -69,8 +69,6 @@ ignore = ["E501"]
|
|||||||
[tool.ruff.lint.per-file-ignores]
|
[tool.ruff.lint.per-file-ignores]
|
||||||
# dbus-fast uses D-Bus type signatures ("o", "h", "a{sv}") as annotations
|
# dbus-fast uses D-Bus type signatures ("o", "h", "a{sv}") as annotations
|
||||||
"src/mcbluetooth/hfp_ag.py" = ["F821", "F722"]
|
"src/mcbluetooth/hfp_ag.py" = ["F821", "F722"]
|
||||||
"src/mcbluetooth/gatt_server.py" = ["F821", "F722"]
|
|
||||||
"src/mcbluetooth/spp.py" = ["F821", "F722"]
|
|
||||||
|
|
||||||
[tool.pytest.ini_options]
|
[tool.pytest.ini_options]
|
||||||
asyncio_mode = "auto"
|
asyncio_mode = "auto"
|
||||||
|
|||||||
@ -1,756 +0,0 @@
|
|||||||
"""BLE GATT Server implementation for BlueZ.
|
|
||||||
|
|
||||||
Registers as a BLE peripheral via BlueZ's GattManager1 D-Bus API, allowing
|
|
||||||
mcbluetooth to act as a GATT server. Remote BLE central devices can discover
|
|
||||||
services, read/write characteristics, and subscribe to notifications.
|
|
||||||
|
|
||||||
D-Bus hierarchy exported to BlueZ:
|
|
||||||
/mcbluetooth/gatt ObjectManager
|
|
||||||
/mcbluetooth/gatt/serviceN GattService1
|
|
||||||
/mcbluetooth/gatt/serviceN/charM GattCharacteristic1
|
|
||||||
/mcbluetooth/gatt/serviceN/charM/descK GattDescriptor1
|
|
||||||
/mcbluetooth/gatt/adv0 LEAdvertisement1
|
|
||||||
|
|
||||||
Reference patterns: agent.py (ServiceInterface, pending requests),
|
|
||||||
hfp_ag.py (dedicated bus, module singleton, ProfileManager registration).
|
|
||||||
"""
|
|
||||||
|
|
||||||
import asyncio
|
|
||||||
import logging
|
|
||||||
from collections import deque
|
|
||||||
from dataclasses import dataclass, field
|
|
||||||
from datetime import UTC, datetime
|
|
||||||
from typing import Any
|
|
||||||
|
|
||||||
from dbus_fast import BusType, Message, MessageType, Variant
|
|
||||||
from dbus_fast.aio import MessageBus
|
|
||||||
from dbus_fast.service import PropertyAccess, ServiceInterface, dbus_property, method
|
|
||||||
|
|
||||||
log = logging.getLogger(__name__)
|
|
||||||
|
|
||||||
BLUEZ_SERVICE = "org.bluez"
|
|
||||||
GATT_MANAGER_IFACE = "org.bluez.GattManager1"
|
|
||||||
LE_ADV_MANAGER_IFACE = "org.bluez.LEAdvertisingManager1"
|
|
||||||
APP_BASE_PATH = "/mcbluetooth/gatt"
|
|
||||||
ADV_PATH = "/mcbluetooth/gatt/adv0"
|
|
||||||
|
|
||||||
|
|
||||||
# ==================== Data Classes ====================
|
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
|
||||||
class WriteEvent:
|
|
||||||
"""A write received from a remote BLE central device."""
|
|
||||||
|
|
||||||
index: int
|
|
||||||
timestamp: str
|
|
||||||
char_id: str
|
|
||||||
char_uuid: str
|
|
||||||
value: bytes
|
|
||||||
value_hex: str
|
|
||||||
value_string: str | None
|
|
||||||
device: str | None
|
|
||||||
|
|
||||||
def to_dict(self) -> dict[str, Any]:
|
|
||||||
d: dict[str, Any] = {
|
|
||||||
"index": self.index,
|
|
||||||
"timestamp": self.timestamp,
|
|
||||||
"char_id": self.char_id,
|
|
||||||
"char_uuid": self.char_uuid,
|
|
||||||
"value_hex": self.value_hex,
|
|
||||||
"length": len(self.value),
|
|
||||||
}
|
|
||||||
if self.value_string is not None:
|
|
||||||
d["value_string"] = self.value_string
|
|
||||||
if self.device:
|
|
||||||
d["device"] = self.device
|
|
||||||
return d
|
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
|
||||||
class ServerCharacteristic:
|
|
||||||
"""Internal state for a server-side GATT characteristic."""
|
|
||||||
|
|
||||||
char_id: str
|
|
||||||
path: str
|
|
||||||
uuid: str
|
|
||||||
flags: list[str]
|
|
||||||
value: bytes = b""
|
|
||||||
notifying: bool = False
|
|
||||||
service_id: str = ""
|
|
||||||
desc_count: int = 0
|
|
||||||
dbus_obj: Any = field(default=None, repr=False)
|
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
|
||||||
class ServerDescriptor:
|
|
||||||
"""Internal state for a server-side GATT descriptor."""
|
|
||||||
|
|
||||||
desc_id: str
|
|
||||||
path: str
|
|
||||||
uuid: str
|
|
||||||
flags: list[str]
|
|
||||||
value: bytes = b""
|
|
||||||
char_id: str = ""
|
|
||||||
dbus_obj: Any = field(default=None, repr=False)
|
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
|
||||||
class ServerService:
|
|
||||||
"""Internal state for a server-side GATT service."""
|
|
||||||
|
|
||||||
service_id: str
|
|
||||||
path: str
|
|
||||||
uuid: str
|
|
||||||
primary: bool = True
|
|
||||||
char_count: int = 0
|
|
||||||
characteristics: dict[str, ServerCharacteristic] = field(default_factory=dict)
|
|
||||||
dbus_obj: Any = field(default=None, repr=False)
|
|
||||||
|
|
||||||
|
|
||||||
# ==================== D-Bus Interfaces ====================
|
|
||||||
|
|
||||||
|
|
||||||
class GattApplication(ServiceInterface):
|
|
||||||
"""ObjectManager for the GATT application root.
|
|
||||||
|
|
||||||
BlueZ calls GetManagedObjects() on this path to discover all
|
|
||||||
services, characteristics, and descriptors we're hosting.
|
|
||||||
"""
|
|
||||||
|
|
||||||
def __init__(self, manager: "GattServerManager"):
|
|
||||||
super().__init__("org.freedesktop.DBus.ObjectManager")
|
|
||||||
self._manager = manager
|
|
||||||
|
|
||||||
@method()
|
|
||||||
def GetManagedObjects(self) -> "a{oa{sa{sv}}}": # noqa: F821
|
|
||||||
return self._manager._build_managed_objects()
|
|
||||||
|
|
||||||
|
|
||||||
class GattServiceIface(ServiceInterface):
|
|
||||||
"""org.bluez.GattService1 — exported at each service path.
|
|
||||||
|
|
||||||
Properties must be exposed via @dbus_property so BlueZ can validate
|
|
||||||
the service object through the standard Properties interface.
|
|
||||||
"""
|
|
||||||
|
|
||||||
def __init__(self, uuid: str, primary: bool):
|
|
||||||
super().__init__("org.bluez.GattService1")
|
|
||||||
self._uuid = uuid
|
|
||||||
self._primary = primary
|
|
||||||
|
|
||||||
@dbus_property(PropertyAccess.READ)
|
|
||||||
def UUID(self) -> "s": # noqa: F821
|
|
||||||
return self._uuid
|
|
||||||
|
|
||||||
@dbus_property(PropertyAccess.READ)
|
|
||||||
def Primary(self) -> "b": # noqa: F821
|
|
||||||
return self._primary
|
|
||||||
|
|
||||||
|
|
||||||
class GattCharacteristicIface(ServiceInterface):
|
|
||||||
"""org.bluez.GattCharacteristic1 — handles reads/writes from remotes.
|
|
||||||
|
|
||||||
ReadValue returns the stored value. WriteValue stores the value AND
|
|
||||||
records a WriteEvent for LLM polling (like agent.py's pending_requests).
|
|
||||||
Properties exposed via @dbus_property for BlueZ introspection.
|
|
||||||
"""
|
|
||||||
|
|
||||||
def __init__(self, char: ServerCharacteristic, manager: "GattServerManager"):
|
|
||||||
super().__init__("org.bluez.GattCharacteristic1")
|
|
||||||
self._char = char
|
|
||||||
self._manager = manager
|
|
||||||
|
|
||||||
@dbus_property(PropertyAccess.READ)
|
|
||||||
def UUID(self) -> "s": # noqa: F821
|
|
||||||
return self._char.uuid
|
|
||||||
|
|
||||||
@dbus_property(PropertyAccess.READ)
|
|
||||||
def Service(self) -> "o": # noqa: F821
|
|
||||||
return f"{APP_BASE_PATH}/{self._char.service_id}"
|
|
||||||
|
|
||||||
@dbus_property(PropertyAccess.READ)
|
|
||||||
def Flags(self) -> "as": # noqa: F821
|
|
||||||
return self._char.flags
|
|
||||||
|
|
||||||
@method()
|
|
||||||
def ReadValue(self, options: "a{sv}") -> "ay": # noqa: F821
|
|
||||||
offset = 0
|
|
||||||
opt_offset = options.get("offset")
|
|
||||||
if opt_offset is not None:
|
|
||||||
offset = opt_offset.value if hasattr(opt_offset, "value") else int(opt_offset)
|
|
||||||
return list(self._char.value[offset:])
|
|
||||||
|
|
||||||
@method()
|
|
||||||
def WriteValue(self, value: "ay", options: "a{sv}") -> None: # noqa: F821
|
|
||||||
new_bytes = bytes(value)
|
|
||||||
offset = 0
|
|
||||||
opt_offset = options.get("offset")
|
|
||||||
if opt_offset is not None:
|
|
||||||
offset = opt_offset.value if hasattr(opt_offset, "value") else int(opt_offset)
|
|
||||||
|
|
||||||
if offset > 0:
|
|
||||||
self._char.value = self._char.value[:offset] + new_bytes
|
|
||||||
else:
|
|
||||||
self._char.value = new_bytes
|
|
||||||
|
|
||||||
device = None
|
|
||||||
opt_device = options.get("device")
|
|
||||||
if opt_device is not None:
|
|
||||||
device = opt_device.value if hasattr(opt_device, "value") else str(opt_device)
|
|
||||||
|
|
||||||
self._manager._record_write(self._char.char_id, self._char.uuid, self._char.value, device)
|
|
||||||
|
|
||||||
@method()
|
|
||||||
def StartNotify(self) -> None:
|
|
||||||
self._char.notifying = True
|
|
||||||
log.info("GATT server: StartNotify on %s (%s)", self._char.char_id, self._char.uuid)
|
|
||||||
|
|
||||||
@method()
|
|
||||||
def StopNotify(self) -> None:
|
|
||||||
self._char.notifying = False
|
|
||||||
log.info("GATT server: StopNotify on %s (%s)", self._char.char_id, self._char.uuid)
|
|
||||||
|
|
||||||
|
|
||||||
class GattDescriptorIface(ServiceInterface):
|
|
||||||
"""org.bluez.GattDescriptor1 — handles reads/writes on descriptors.
|
|
||||||
|
|
||||||
Properties exposed via @dbus_property for BlueZ introspection.
|
|
||||||
"""
|
|
||||||
|
|
||||||
def __init__(self, desc: ServerDescriptor):
|
|
||||||
super().__init__("org.bluez.GattDescriptor1")
|
|
||||||
self._desc = desc
|
|
||||||
|
|
||||||
@dbus_property(PropertyAccess.READ)
|
|
||||||
def UUID(self) -> "s": # noqa: F821
|
|
||||||
return self._desc.uuid
|
|
||||||
|
|
||||||
@dbus_property(PropertyAccess.READ)
|
|
||||||
def Characteristic(self) -> "o": # noqa: F821
|
|
||||||
return f"{APP_BASE_PATH}/{self._desc.char_id}"
|
|
||||||
|
|
||||||
@dbus_property(PropertyAccess.READ)
|
|
||||||
def Flags(self) -> "as": # noqa: F821
|
|
||||||
return self._desc.flags
|
|
||||||
|
|
||||||
@method()
|
|
||||||
def ReadValue(self, options: "a{sv}") -> "ay": # noqa: F821
|
|
||||||
offset = 0
|
|
||||||
opt_offset = options.get("offset")
|
|
||||||
if opt_offset is not None:
|
|
||||||
offset = opt_offset.value if hasattr(opt_offset, "value") else int(opt_offset)
|
|
||||||
return list(self._desc.value[offset:])
|
|
||||||
|
|
||||||
@method()
|
|
||||||
def WriteValue(self, value: "ay", options: "a{sv}") -> None: # noqa: F821
|
|
||||||
self._desc.value = bytes(value)
|
|
||||||
|
|
||||||
|
|
||||||
class LEAdvertisementIface(ServiceInterface):
|
|
||||||
"""org.bluez.LEAdvertisement1 — BLE advertising data.
|
|
||||||
|
|
||||||
BlueZ reads advertisement properties via Properties.GetAll, so these
|
|
||||||
must be exposed as @dbus_property (unlike GATT objects which use
|
|
||||||
GetManagedObjects).
|
|
||||||
"""
|
|
||||||
|
|
||||||
def __init__(self, adv_type: str, local_name: str, service_uuids: list[str]):
|
|
||||||
super().__init__("org.bluez.LEAdvertisement1")
|
|
||||||
self._type = adv_type
|
|
||||||
self._local_name = local_name
|
|
||||||
self._service_uuids = service_uuids
|
|
||||||
|
|
||||||
@method()
|
|
||||||
def Release(self) -> None:
|
|
||||||
log.info("GATT server: advertisement released by BlueZ")
|
|
||||||
|
|
||||||
@dbus_property(PropertyAccess.READ)
|
|
||||||
def Type(self) -> "s": # noqa: F821
|
|
||||||
return self._type
|
|
||||||
|
|
||||||
@dbus_property(PropertyAccess.READ)
|
|
||||||
def LocalName(self) -> "s": # noqa: F821
|
|
||||||
return self._local_name
|
|
||||||
|
|
||||||
@dbus_property(PropertyAccess.READ)
|
|
||||||
def ServiceUUIDs(self) -> "as": # noqa: F821
|
|
||||||
return self._service_uuids
|
|
||||||
|
|
||||||
|
|
||||||
# ==================== GATT Server Manager ====================
|
|
||||||
|
|
||||||
|
|
||||||
class GattServerManager:
|
|
||||||
"""Manages the GATT server lifecycle and state.
|
|
||||||
|
|
||||||
Coordinates D-Bus object creation, BlueZ registration,
|
|
||||||
advertising, and write event buffering.
|
|
||||||
"""
|
|
||||||
|
|
||||||
def __init__(self):
|
|
||||||
self._bus: MessageBus | None = None
|
|
||||||
self._app: GattApplication | None = None
|
|
||||||
self._adv: LEAdvertisementIface | None = None
|
|
||||||
self._services: dict[str, ServerService] = {}
|
|
||||||
self._characteristics: dict[str, ServerCharacteristic] = {}
|
|
||||||
self._descriptors: dict[str, ServerDescriptor] = {}
|
|
||||||
self._write_events: deque[WriteEvent] = deque(maxlen=200)
|
|
||||||
self._write_index: int = 0
|
|
||||||
self._service_count: int = 0
|
|
||||||
self._registered: bool = False
|
|
||||||
self._advertising: bool = False
|
|
||||||
self._write_callback: Any = None # async callable(char_id, value, device)
|
|
||||||
|
|
||||||
# ---- Service construction ----
|
|
||||||
|
|
||||||
def add_service(self, uuid: str, primary: bool = True) -> str:
|
|
||||||
"""Add a GATT service. Returns the service_id (e.g. 'service0')."""
|
|
||||||
service_id = f"service{self._service_count}"
|
|
||||||
self._service_count += 1
|
|
||||||
path = f"{APP_BASE_PATH}/{service_id}"
|
|
||||||
|
|
||||||
svc = ServerService(
|
|
||||||
service_id=service_id,
|
|
||||||
path=path,
|
|
||||||
uuid=uuid,
|
|
||||||
primary=primary,
|
|
||||||
)
|
|
||||||
svc.dbus_obj = GattServiceIface(uuid, primary)
|
|
||||||
self._services[service_id] = svc
|
|
||||||
|
|
||||||
log.info("GATT server: added service %s (UUID=%s)", service_id, uuid)
|
|
||||||
return service_id
|
|
||||||
|
|
||||||
def add_characteristic(
|
|
||||||
self,
|
|
||||||
service_id: str,
|
|
||||||
uuid: str,
|
|
||||||
flags: list[str],
|
|
||||||
value: bytes = b"",
|
|
||||||
) -> str:
|
|
||||||
"""Add a characteristic to a service. Returns the char_id."""
|
|
||||||
svc = self._services.get(service_id)
|
|
||||||
if not svc:
|
|
||||||
raise ValueError(f"Service '{service_id}' not found")
|
|
||||||
|
|
||||||
char_idx = svc.char_count
|
|
||||||
svc.char_count += 1
|
|
||||||
char_id = f"{service_id}/char{char_idx}"
|
|
||||||
path = f"{APP_BASE_PATH}/{char_id}"
|
|
||||||
|
|
||||||
char = ServerCharacteristic(
|
|
||||||
char_id=char_id,
|
|
||||||
path=path,
|
|
||||||
uuid=uuid,
|
|
||||||
flags=flags,
|
|
||||||
value=value,
|
|
||||||
service_id=service_id,
|
|
||||||
)
|
|
||||||
char.dbus_obj = GattCharacteristicIface(char, self)
|
|
||||||
svc.characteristics[char_id] = char
|
|
||||||
self._characteristics[char_id] = char
|
|
||||||
|
|
||||||
log.info(
|
|
||||||
"GATT server: added characteristic %s (UUID=%s, flags=%s)",
|
|
||||||
char_id,
|
|
||||||
uuid,
|
|
||||||
flags,
|
|
||||||
)
|
|
||||||
return char_id
|
|
||||||
|
|
||||||
def add_descriptor(
|
|
||||||
self,
|
|
||||||
char_id: str,
|
|
||||||
uuid: str,
|
|
||||||
flags: list[str],
|
|
||||||
value: bytes = b"",
|
|
||||||
) -> str:
|
|
||||||
"""Add a descriptor to a characteristic. Returns the desc_id."""
|
|
||||||
char = self._characteristics.get(char_id)
|
|
||||||
if not char:
|
|
||||||
raise ValueError(f"Characteristic '{char_id}' not found")
|
|
||||||
|
|
||||||
desc_idx = char.desc_count
|
|
||||||
char.desc_count += 1
|
|
||||||
desc_id = f"{char_id}/desc{desc_idx}"
|
|
||||||
path = f"{APP_BASE_PATH}/{desc_id}"
|
|
||||||
|
|
||||||
desc = ServerDescriptor(
|
|
||||||
desc_id=desc_id,
|
|
||||||
path=path,
|
|
||||||
uuid=uuid,
|
|
||||||
flags=flags,
|
|
||||||
value=value,
|
|
||||||
char_id=char_id,
|
|
||||||
)
|
|
||||||
desc.dbus_obj = GattDescriptorIface(desc)
|
|
||||||
self._descriptors[desc_id] = desc
|
|
||||||
|
|
||||||
log.info("GATT server: added descriptor %s (UUID=%s)", desc_id, uuid)
|
|
||||||
return desc_id
|
|
||||||
|
|
||||||
def clear(self) -> None:
|
|
||||||
"""Remove all services, characteristics, and descriptors."""
|
|
||||||
if self._registered:
|
|
||||||
raise RuntimeError("Cannot clear while registered — unregister first")
|
|
||||||
|
|
||||||
self._services.clear()
|
|
||||||
self._characteristics.clear()
|
|
||||||
self._descriptors.clear()
|
|
||||||
self._service_count = 0
|
|
||||||
|
|
||||||
# ---- Registration with BlueZ ----
|
|
||||||
|
|
||||||
async def register(self, adapter: str) -> None:
|
|
||||||
"""Register the GATT application with BlueZ GattManager1."""
|
|
||||||
if self._registered:
|
|
||||||
return
|
|
||||||
|
|
||||||
if not self._services:
|
|
||||||
raise ValueError("No services defined — add at least one service first")
|
|
||||||
|
|
||||||
# Dedicated bus connection (like hfp_ag.py)
|
|
||||||
if not self._bus:
|
|
||||||
self._bus = await MessageBus(bus_type=BusType.SYSTEM).connect()
|
|
||||||
|
|
||||||
# Export ObjectManager at application root
|
|
||||||
self._app = GattApplication(self)
|
|
||||||
self._bus.export(APP_BASE_PATH, self._app)
|
|
||||||
|
|
||||||
# Export all service/characteristic/descriptor D-Bus objects
|
|
||||||
for svc in self._services.values():
|
|
||||||
self._bus.export(svc.path, svc.dbus_obj)
|
|
||||||
for char in svc.characteristics.values():
|
|
||||||
self._bus.export(char.path, char.dbus_obj)
|
|
||||||
for desc in self._descriptors.values():
|
|
||||||
self._bus.export(desc.path, desc.dbus_obj)
|
|
||||||
|
|
||||||
# Register with BlueZ
|
|
||||||
adapter_path = f"/org/bluez/{adapter}"
|
|
||||||
introspection = await self._bus.introspect(BLUEZ_SERVICE, adapter_path)
|
|
||||||
proxy = self._bus.get_proxy_object(BLUEZ_SERVICE, adapter_path, introspection)
|
|
||||||
gatt_mgr = proxy.get_interface(GATT_MANAGER_IFACE)
|
|
||||||
|
|
||||||
try:
|
|
||||||
await gatt_mgr.call_register_application(APP_BASE_PATH, {})
|
|
||||||
self._registered = True
|
|
||||||
log.info("GATT server: registered with BlueZ (%s)", adapter)
|
|
||||||
except Exception as e:
|
|
||||||
if "Already Exists" in str(e):
|
|
||||||
log.info("GATT server: stale registration — re-registering")
|
|
||||||
try:
|
|
||||||
await gatt_mgr.call_unregister_application(APP_BASE_PATH)
|
|
||||||
except Exception:
|
|
||||||
pass
|
|
||||||
await gatt_mgr.call_register_application(APP_BASE_PATH, {})
|
|
||||||
self._registered = True
|
|
||||||
log.info("GATT server: re-registered with BlueZ (%s)", adapter)
|
|
||||||
else:
|
|
||||||
raise
|
|
||||||
|
|
||||||
async def unregister(self, adapter: str) -> None:
|
|
||||||
"""Unregister the GATT application from BlueZ."""
|
|
||||||
if self._advertising:
|
|
||||||
await self.set_advertising(adapter, False)
|
|
||||||
|
|
||||||
if self._bus and self._registered:
|
|
||||||
try:
|
|
||||||
adapter_path = f"/org/bluez/{adapter}"
|
|
||||||
introspection = await self._bus.introspect(BLUEZ_SERVICE, adapter_path)
|
|
||||||
proxy = self._bus.get_proxy_object(BLUEZ_SERVICE, adapter_path, introspection)
|
|
||||||
gatt_mgr = proxy.get_interface(GATT_MANAGER_IFACE)
|
|
||||||
await gatt_mgr.call_unregister_application(APP_BASE_PATH)
|
|
||||||
except Exception:
|
|
||||||
pass
|
|
||||||
self._registered = False
|
|
||||||
|
|
||||||
# Disconnect bus (cleans up all exports)
|
|
||||||
if self._bus:
|
|
||||||
self._bus.disconnect()
|
|
||||||
self._bus = None
|
|
||||||
|
|
||||||
self._app = None
|
|
||||||
self._adv = None
|
|
||||||
log.info("GATT server: unregistered")
|
|
||||||
|
|
||||||
# ---- Advertising ----
|
|
||||||
|
|
||||||
async def set_advertising(
|
|
||||||
self,
|
|
||||||
adapter: str,
|
|
||||||
enable: bool,
|
|
||||||
name: str = "mcbluetooth",
|
|
||||||
service_uuids: list[str] | None = None,
|
|
||||||
) -> None:
|
|
||||||
"""Start or stop BLE advertising."""
|
|
||||||
if not self._bus:
|
|
||||||
raise RuntimeError("Bus not connected — register first")
|
|
||||||
|
|
||||||
adapter_path = f"/org/bluez/{adapter}"
|
|
||||||
introspection = await self._bus.introspect(BLUEZ_SERVICE, adapter_path)
|
|
||||||
proxy = self._bus.get_proxy_object(BLUEZ_SERVICE, adapter_path, introspection)
|
|
||||||
adv_mgr = proxy.get_interface(LE_ADV_MANAGER_IFACE)
|
|
||||||
|
|
||||||
if enable:
|
|
||||||
if self._advertising:
|
|
||||||
return
|
|
||||||
|
|
||||||
uuids = service_uuids or [svc.uuid for svc in self._services.values()]
|
|
||||||
self._adv = LEAdvertisementIface("peripheral", name, uuids)
|
|
||||||
self._bus.export(ADV_PATH, self._adv)
|
|
||||||
|
|
||||||
try:
|
|
||||||
await adv_mgr.call_register_advertisement(ADV_PATH, {})
|
|
||||||
self._advertising = True
|
|
||||||
log.info("GATT server: advertising started (name=%s)", name)
|
|
||||||
except Exception as e:
|
|
||||||
if "Already Exists" in str(e):
|
|
||||||
try:
|
|
||||||
await adv_mgr.call_unregister_advertisement(ADV_PATH)
|
|
||||||
except Exception:
|
|
||||||
pass
|
|
||||||
await adv_mgr.call_register_advertisement(ADV_PATH, {})
|
|
||||||
self._advertising = True
|
|
||||||
log.info("GATT server: advertising restarted (name=%s)", name)
|
|
||||||
else:
|
|
||||||
raise
|
|
||||||
else:
|
|
||||||
if not self._advertising:
|
|
||||||
return
|
|
||||||
try:
|
|
||||||
await adv_mgr.call_unregister_advertisement(ADV_PATH)
|
|
||||||
except Exception:
|
|
||||||
pass
|
|
||||||
self._advertising = False
|
|
||||||
self._adv = None
|
|
||||||
log.info("GATT server: advertising stopped")
|
|
||||||
|
|
||||||
# ---- Value management ----
|
|
||||||
|
|
||||||
def set_value(self, char_id: str, value: bytes) -> None:
|
|
||||||
"""Set characteristic value and auto-notify if subscribed."""
|
|
||||||
char = self._characteristics.get(char_id)
|
|
||||||
if not char:
|
|
||||||
raise ValueError(f"Characteristic '{char_id}' not found")
|
|
||||||
|
|
||||||
char.value = value
|
|
||||||
if char.notifying:
|
|
||||||
self._emit_notification(char)
|
|
||||||
|
|
||||||
def get_value(self, char_id: str) -> bytes:
|
|
||||||
"""Get current characteristic value."""
|
|
||||||
char = self._characteristics.get(char_id)
|
|
||||||
if not char:
|
|
||||||
raise ValueError(f"Characteristic '{char_id}' not found")
|
|
||||||
return char.value
|
|
||||||
|
|
||||||
def notify(self, char_id: str) -> bool:
|
|
||||||
"""Explicitly send notification for current value."""
|
|
||||||
char = self._characteristics.get(char_id)
|
|
||||||
if not char:
|
|
||||||
raise ValueError(f"Characteristic '{char_id}' not found")
|
|
||||||
if not char.notifying:
|
|
||||||
return False
|
|
||||||
self._emit_notification(char)
|
|
||||||
return True
|
|
||||||
|
|
||||||
def _emit_notification(self, char: ServerCharacteristic) -> None:
|
|
||||||
"""Emit PropertiesChanged signal to trigger BLE notification."""
|
|
||||||
if not self._bus:
|
|
||||||
return
|
|
||||||
|
|
||||||
msg = Message(
|
|
||||||
message_type=MessageType.SIGNAL,
|
|
||||||
path=char.path,
|
|
||||||
interface="org.freedesktop.DBus.Properties",
|
|
||||||
member="PropertiesChanged",
|
|
||||||
signature="sa{sv}as",
|
|
||||||
body=[
|
|
||||||
"org.bluez.GattCharacteristic1",
|
|
||||||
{"Value": Variant("ay", list(char.value))},
|
|
||||||
[],
|
|
||||||
],
|
|
||||||
)
|
|
||||||
self._bus.send_message(msg)
|
|
||||||
log.debug("GATT server: notification sent on %s (%d bytes)", char.char_id, len(char.value))
|
|
||||||
|
|
||||||
# ---- Write event monitoring ----
|
|
||||||
|
|
||||||
def _record_write(self, char_id: str, char_uuid: str, value: bytes, device: str | None) -> None:
|
|
||||||
"""Record a write event and fire callback if registered."""
|
|
||||||
event = WriteEvent(
|
|
||||||
index=self._write_index,
|
|
||||||
timestamp=datetime.now(UTC).isoformat(),
|
|
||||||
char_id=char_id,
|
|
||||||
char_uuid=char_uuid,
|
|
||||||
value=value,
|
|
||||||
value_hex=value.hex(),
|
|
||||||
value_string=_try_decode(value),
|
|
||||||
device=device,
|
|
||||||
)
|
|
||||||
self._write_events.append(event)
|
|
||||||
self._write_index += 1
|
|
||||||
|
|
||||||
log.debug(
|
|
||||||
"GATT server: write on %s from %s (%d bytes)",
|
|
||||||
char_id,
|
|
||||||
device or "unknown",
|
|
||||||
len(value),
|
|
||||||
)
|
|
||||||
|
|
||||||
# Fire async callback (e.g. ELM327 emulator auto-responder)
|
|
||||||
if self._write_callback:
|
|
||||||
try:
|
|
||||||
loop = asyncio.get_running_loop()
|
|
||||||
loop.create_task(self._write_callback(char_id, value, device))
|
|
||||||
except RuntimeError:
|
|
||||||
pass
|
|
||||||
|
|
||||||
def get_write_events(
|
|
||||||
self,
|
|
||||||
since_index: int = 0,
|
|
||||||
char_id: str | None = None,
|
|
||||||
limit: int = 50,
|
|
||||||
) -> list[dict[str, Any]]:
|
|
||||||
"""Get write events, optionally filtered."""
|
|
||||||
events = [
|
|
||||||
e
|
|
||||||
for e in self._write_events
|
|
||||||
if e.index >= since_index and (char_id is None or e.char_id == char_id)
|
|
||||||
]
|
|
||||||
return [e.to_dict() for e in events[:limit]]
|
|
||||||
|
|
||||||
def clear_write_events(self) -> int:
|
|
||||||
"""Clear all write events. Returns count cleared."""
|
|
||||||
count = len(self._write_events)
|
|
||||||
self._write_events.clear()
|
|
||||||
return count
|
|
||||||
|
|
||||||
# ---- GetManagedObjects builder ----
|
|
||||||
|
|
||||||
def _build_managed_objects(self) -> dict[str, dict[str, dict[str, Variant]]]:
|
|
||||||
"""Build the response for ObjectManager.GetManagedObjects().
|
|
||||||
|
|
||||||
Returns the complete D-Bus object hierarchy that BlueZ uses
|
|
||||||
to discover our GATT services, characteristics, and descriptors.
|
|
||||||
"""
|
|
||||||
objects: dict[str, dict[str, dict[str, Variant]]] = {}
|
|
||||||
|
|
||||||
for svc in self._services.values():
|
|
||||||
char_paths = [c.path for c in svc.characteristics.values()]
|
|
||||||
objects[svc.path] = {
|
|
||||||
"org.bluez.GattService1": {
|
|
||||||
"UUID": Variant("s", svc.uuid),
|
|
||||||
"Primary": Variant("b", svc.primary),
|
|
||||||
"Characteristics": Variant("ao", char_paths),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
for char in svc.characteristics.values():
|
|
||||||
desc_paths = [
|
|
||||||
d.path for d in self._descriptors.values() if d.char_id == char.char_id
|
|
||||||
]
|
|
||||||
objects[char.path] = {
|
|
||||||
"org.bluez.GattCharacteristic1": {
|
|
||||||
"UUID": Variant("s", char.uuid),
|
|
||||||
"Service": Variant("o", svc.path),
|
|
||||||
"Flags": Variant("as", char.flags),
|
|
||||||
"Descriptors": Variant("ao", desc_paths),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
for desc in self._descriptors.values():
|
|
||||||
char = self._characteristics.get(desc.char_id)
|
|
||||||
if char:
|
|
||||||
objects[desc.path] = {
|
|
||||||
"org.bluez.GattDescriptor1": {
|
|
||||||
"UUID": Variant("s", desc.uuid),
|
|
||||||
"Characteristic": Variant("o", char.path),
|
|
||||||
"Flags": Variant("as", desc.flags),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return objects
|
|
||||||
|
|
||||||
# ---- Status ----
|
|
||||||
|
|
||||||
def get_status(self) -> dict[str, Any]:
|
|
||||||
"""Get overall GATT server status."""
|
|
||||||
services = []
|
|
||||||
for svc in self._services.values():
|
|
||||||
chars = []
|
|
||||||
for char in svc.characteristics.values():
|
|
||||||
descs = [
|
|
||||||
{
|
|
||||||
"desc_id": d.desc_id,
|
|
||||||
"uuid": d.uuid,
|
|
||||||
"flags": d.flags,
|
|
||||||
}
|
|
||||||
for d in self._descriptors.values()
|
|
||||||
if d.char_id == char.char_id
|
|
||||||
]
|
|
||||||
chars.append(
|
|
||||||
{
|
|
||||||
"char_id": char.char_id,
|
|
||||||
"uuid": char.uuid,
|
|
||||||
"flags": char.flags,
|
|
||||||
"value_length": len(char.value),
|
|
||||||
"notifying": char.notifying,
|
|
||||||
"descriptors": descs,
|
|
||||||
}
|
|
||||||
)
|
|
||||||
services.append(
|
|
||||||
{
|
|
||||||
"service_id": svc.service_id,
|
|
||||||
"uuid": svc.uuid,
|
|
||||||
"primary": svc.primary,
|
|
||||||
"characteristics": chars,
|
|
||||||
}
|
|
||||||
)
|
|
||||||
|
|
||||||
return {
|
|
||||||
"registered": self._registered,
|
|
||||||
"advertising": self._advertising,
|
|
||||||
"services": services,
|
|
||||||
"write_event_count": len(self._write_events),
|
|
||||||
"write_event_total": self._write_index,
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
# ==================== Module-level lifecycle ====================
|
|
||||||
|
|
||||||
_manager: GattServerManager | None = None
|
|
||||||
|
|
||||||
|
|
||||||
def get_gatt_server() -> GattServerManager:
|
|
||||||
"""Get or create the global GATT server manager (singleton)."""
|
|
||||||
global _manager
|
|
||||||
if _manager is None:
|
|
||||||
_manager = GattServerManager()
|
|
||||||
return _manager
|
|
||||||
|
|
||||||
|
|
||||||
async def shutdown_gatt_server(adapter: str) -> None:
|
|
||||||
"""Shut down the GATT server completely."""
|
|
||||||
global _manager
|
|
||||||
if _manager:
|
|
||||||
try:
|
|
||||||
await _manager.unregister(adapter)
|
|
||||||
except Exception:
|
|
||||||
pass
|
|
||||||
_manager = None
|
|
||||||
|
|
||||||
|
|
||||||
# ==================== Helpers ====================
|
|
||||||
|
|
||||||
|
|
||||||
def _try_decode(value: bytes) -> str | None:
|
|
||||||
"""Try to decode bytes as printable UTF-8."""
|
|
||||||
try:
|
|
||||||
decoded = value.decode("utf-8")
|
|
||||||
if all(c.isprintable() or c in "\r\n\t" for c in decoded):
|
|
||||||
return decoded
|
|
||||||
return None
|
|
||||||
except (UnicodeDecodeError, ValueError):
|
|
||||||
return None
|
|
||||||
@ -17,7 +17,6 @@ from dataclasses import asdict
|
|||||||
from fastmcp import FastMCP
|
from fastmcp import FastMCP
|
||||||
|
|
||||||
from mcbluetooth.dbus_client import get_client, get_notify_manager
|
from mcbluetooth.dbus_client import get_client, get_notify_manager
|
||||||
from mcbluetooth.gatt_server import get_gatt_server
|
|
||||||
|
|
||||||
|
|
||||||
def register_resources(mcp: FastMCP) -> None:
|
def register_resources(mcp: FastMCP) -> None:
|
||||||
@ -308,99 +307,3 @@ def register_resources(mcp: FastMCP) -> None:
|
|||||||
},
|
},
|
||||||
indent=2,
|
indent=2,
|
||||||
)
|
)
|
||||||
|
|
||||||
# ==================== SPP Resources ====================
|
|
||||||
|
|
||||||
@mcp.resource(
|
|
||||||
"bluetooth://spp/connections",
|
|
||||||
name="SPP Connections",
|
|
||||||
description=(
|
|
||||||
"Active SPP (Serial Port Profile) connections with role, "
|
|
||||||
"duration, and byte counters for each peer."
|
|
||||||
),
|
|
||||||
mime_type="application/json",
|
|
||||||
)
|
|
||||||
async def resource_spp_connections() -> str:
|
|
||||||
"""Get active SPP connections."""
|
|
||||||
from mcbluetooth.spp import get_spp
|
|
||||||
|
|
||||||
profile = await get_spp()
|
|
||||||
if not profile:
|
|
||||||
return json.dumps({"registered": False, "connections": []})
|
|
||||||
|
|
||||||
status = profile.get_status()
|
|
||||||
return json.dumps(
|
|
||||||
{
|
|
||||||
"registered": status["registered"],
|
|
||||||
"uuid": status["uuid"],
|
|
||||||
"connections": status["connections"],
|
|
||||||
},
|
|
||||||
indent=2,
|
|
||||||
)
|
|
||||||
|
|
||||||
@mcp.resource(
|
|
||||||
"bluetooth://spp/data",
|
|
||||||
name="SPP Received Data",
|
|
||||||
description=(
|
|
||||||
"Recent data received from SPP peers. Returns the last 50 "
|
|
||||||
"data events with timestamps, addresses, and hex/string values."
|
|
||||||
),
|
|
||||||
mime_type="application/json",
|
|
||||||
)
|
|
||||||
async def resource_spp_data() -> str:
|
|
||||||
"""Get recent SPP received data events."""
|
|
||||||
from mcbluetooth.spp import get_spp
|
|
||||||
|
|
||||||
profile = await get_spp()
|
|
||||||
if not profile:
|
|
||||||
return json.dumps({"count": 0, "events": [], "hint": "SPP not enabled"})
|
|
||||||
|
|
||||||
events = profile.get_recv_events(since_index=0, limit=50)
|
|
||||||
return json.dumps(
|
|
||||||
{
|
|
||||||
"count": len(events),
|
|
||||||
"total": profile._recv_index,
|
|
||||||
"events": events,
|
|
||||||
},
|
|
||||||
indent=2,
|
|
||||||
)
|
|
||||||
|
|
||||||
# ==================== GATT Server Resources ====================
|
|
||||||
|
|
||||||
@mcp.resource(
|
|
||||||
"bluetooth://gatt/server",
|
|
||||||
name="GATT Server State",
|
|
||||||
description=(
|
|
||||||
"Current state of the BLE GATT server (peripheral mode). "
|
|
||||||
"Shows registered services, characteristics, advertising status, "
|
|
||||||
"and write event statistics."
|
|
||||||
),
|
|
||||||
mime_type="application/json",
|
|
||||||
)
|
|
||||||
async def resource_gatt_server() -> str:
|
|
||||||
"""Get GATT server state."""
|
|
||||||
mgr = get_gatt_server()
|
|
||||||
return json.dumps(mgr.get_status(), indent=2)
|
|
||||||
|
|
||||||
@mcp.resource(
|
|
||||||
"bluetooth://gatt/server/writes",
|
|
||||||
name="GATT Server Write Events",
|
|
||||||
description=(
|
|
||||||
"Write events received from remote BLE central devices. "
|
|
||||||
"Shows the most recent writes to server characteristics "
|
|
||||||
"with timestamps, values, and source device info."
|
|
||||||
),
|
|
||||||
mime_type="application/json",
|
|
||||||
)
|
|
||||||
async def resource_gatt_server_writes() -> str:
|
|
||||||
"""Get recent write events from remote clients."""
|
|
||||||
mgr = get_gatt_server()
|
|
||||||
events = mgr.get_write_events(since_index=0, limit=50)
|
|
||||||
return json.dumps(
|
|
||||||
{
|
|
||||||
"count": len(events),
|
|
||||||
"total": mgr._write_index,
|
|
||||||
"events": events,
|
|
||||||
},
|
|
||||||
indent=2,
|
|
||||||
)
|
|
||||||
|
|||||||
@ -3,18 +3,7 @@
|
|||||||
from fastmcp import FastMCP
|
from fastmcp import FastMCP
|
||||||
|
|
||||||
from mcbluetooth import resources
|
from mcbluetooth import resources
|
||||||
from mcbluetooth.tools import (
|
from mcbluetooth.tools import adapter, audio, ble, device, hfp, monitor, obex
|
||||||
adapter,
|
|
||||||
audio,
|
|
||||||
ble,
|
|
||||||
bt_elm327_emu,
|
|
||||||
device,
|
|
||||||
gatt_server,
|
|
||||||
hfp,
|
|
||||||
monitor,
|
|
||||||
obex,
|
|
||||||
spp,
|
|
||||||
)
|
|
||||||
|
|
||||||
mcp = FastMCP(
|
mcp = FastMCP(
|
||||||
name="mcbluetooth",
|
name="mcbluetooth",
|
||||||
@ -47,41 +36,10 @@ To capture BLE notifications:
|
|||||||
3. Read notifications: Use the bluetooth://ble/{address}/{uuid}/notifications resource
|
3. Read notifications: Use the bluetooth://ble/{address}/{uuid}/notifications resource
|
||||||
4. Subscribe to the resource for real-time updates (client-side)
|
4. Subscribe to the resource for real-time updates (client-side)
|
||||||
|
|
||||||
### GATT Server Resources (peripheral mode)
|
|
||||||
- bluetooth://gatt/server - GATT server state (registered, services, advertising)
|
|
||||||
- bluetooth://gatt/server/writes - Write events from remote clients
|
|
||||||
|
|
||||||
To act as a BLE peripheral (GATT server):
|
|
||||||
1. Add services: bt_gatt_server_add_service(uuid)
|
|
||||||
2. Add characteristics: bt_gatt_server_add_characteristic(service_id, uuid, flags)
|
|
||||||
3. Register: bt_gatt_server_register(adapter)
|
|
||||||
4. Advertise: bt_gatt_server_advertise(adapter, enable=True, name="MyDevice")
|
|
||||||
5. Monitor writes: bt_gatt_server_read_writes(since_index=0)
|
|
||||||
6. Respond: bt_gatt_server_set_value(char_id, value) — auto-notifies subscribers
|
|
||||||
|
|
||||||
### ELM327 OBD-II Emulator
|
|
||||||
For quick OBD-II BLE adapter emulation (Nordic UART Service):
|
|
||||||
1. bt_elm327_emu_start(adapter, name="OBDII") — sets up NUS, advertises
|
|
||||||
2. bt_elm327_emu_set_pid(0x0C, "0BB8", "hex") — set RPM to 750
|
|
||||||
3. bt_elm327_emu_read_commands() — see what the OBD app sent
|
|
||||||
4. bt_elm327_emu_stop(adapter) — clean up
|
|
||||||
|
|
||||||
## Tools
|
## Tools
|
||||||
All tools require an explicit 'adapter' parameter (e.g., "hci0").
|
All tools require an explicit 'adapter' parameter (e.g., "hci0").
|
||||||
Use bt_list_adapters() to discover available adapters.
|
Use bt_list_adapters() to discover available adapters.
|
||||||
|
|
||||||
### SPP (Serial Port Profile) — Classic Bluetooth Serial
|
|
||||||
- bluetooth://spp/connections — Active SPP connections
|
|
||||||
- bluetooth://spp/data — Recent received data events
|
|
||||||
|
|
||||||
For classic Bluetooth serial (RFCOMM):
|
|
||||||
1. bt_spp_enable() — register profile
|
|
||||||
2. Server: make discoverable, remote connects automatically
|
|
||||||
3. Client: bt_spp_connect(adapter, address)
|
|
||||||
4. Send: bt_spp_send(address, "ATZ\\r\\n", "string")
|
|
||||||
5. Receive: bt_spp_recv(since_index=0) — cursor-based polling
|
|
||||||
6. Disconnect: bt_spp_disconnect(address)
|
|
||||||
|
|
||||||
For pairing, use pairing_mode parameter:
|
For pairing, use pairing_mode parameter:
|
||||||
- "elicit": Use MCP elicitation to request PIN from user (preferred)
|
- "elicit": Use MCP elicitation to request PIN from user (preferred)
|
||||||
- "interactive": Return awaiting status, then call bt_pair_confirm
|
- "interactive": Return awaiting status, then call bt_pair_confirm
|
||||||
@ -98,11 +56,8 @@ device.register_tools(mcp)
|
|||||||
audio.register_tools(mcp)
|
audio.register_tools(mcp)
|
||||||
hfp.register_tools(mcp)
|
hfp.register_tools(mcp)
|
||||||
ble.register_tools(mcp)
|
ble.register_tools(mcp)
|
||||||
gatt_server.register_tools(mcp)
|
|
||||||
bt_elm327_emu.register_tools(mcp)
|
|
||||||
monitor.register_tools(mcp)
|
monitor.register_tools(mcp)
|
||||||
obex.register_tools(mcp)
|
obex.register_tools(mcp)
|
||||||
spp.register_tools(mcp)
|
|
||||||
|
|
||||||
|
|
||||||
def main():
|
def main():
|
||||||
|
|||||||
@ -1,419 +0,0 @@
|
|||||||
"""SPP (Serial Port Profile) implementation for BlueZ.
|
|
||||||
|
|
||||||
Registers as an SPP endpoint via BlueZ ProfileManager1, providing raw
|
|
||||||
bidirectional serial byte streams over RFCOMM. When a remote device
|
|
||||||
connects, BlueZ hands us the RFCOMM file descriptor through the Profile1
|
|
||||||
D-Bus interface — the same pattern used by hfp_ag.py, minus the AT
|
|
||||||
command protocol layer.
|
|
||||||
|
|
||||||
Use cases: serial terminal to Arduino/ESP32, GPS receivers (NMEA),
|
|
||||||
legacy sensors, Bluetooth modems, any line-oriented protocol over
|
|
||||||
classic Bluetooth.
|
|
||||||
|
|
||||||
D-Bus flow:
|
|
||||||
1. RegisterProfile(SPP_UUID) with ProfileManager1
|
|
||||||
2. Remote connects → BlueZ calls NewConnection(device, fd, props)
|
|
||||||
3. We dup the fd, wrap in async streams, read/write raw bytes
|
|
||||||
4. RequestDisconnection or EOF → cleanup
|
|
||||||
"""
|
|
||||||
|
|
||||||
import asyncio
|
|
||||||
import logging
|
|
||||||
import os
|
|
||||||
import socket
|
|
||||||
import time
|
|
||||||
from collections import deque
|
|
||||||
from dataclasses import dataclass, field
|
|
||||||
from datetime import UTC, datetime
|
|
||||||
from typing import Any
|
|
||||||
|
|
||||||
from dbus_fast import BusType, Variant
|
|
||||||
from dbus_fast.aio import MessageBus
|
|
||||||
from dbus_fast.service import ServiceInterface, method
|
|
||||||
|
|
||||||
log = logging.getLogger(__name__)
|
|
||||||
|
|
||||||
# Bluetooth socket constants — Python often compiled without bluetooth.h
|
|
||||||
_AF_BLUETOOTH = getattr(socket, "AF_BLUETOOTH", 31)
|
|
||||||
_BTPROTO_RFCOMM = getattr(socket, "BTPROTO_RFCOMM", 3)
|
|
||||||
|
|
||||||
# D-Bus constants
|
|
||||||
BLUEZ_SERVICE = "org.bluez"
|
|
||||||
PROFILE_MANAGER_IFACE = "org.bluez.ProfileManager1"
|
|
||||||
SPP_UUID = "00001101-0000-1000-8000-00805f9b34fb"
|
|
||||||
SPP_PROFILE_PATH = "/mcbluetooth/spp"
|
|
||||||
|
|
||||||
|
|
||||||
def _try_decode(value: bytes) -> str | None:
|
|
||||||
"""Try to decode bytes as printable UTF-8."""
|
|
||||||
try:
|
|
||||||
decoded = value.decode("utf-8")
|
|
||||||
if all(c.isprintable() or c in "\r\n\t" for c in decoded):
|
|
||||||
return decoded
|
|
||||||
return None
|
|
||||||
except (UnicodeDecodeError, ValueError):
|
|
||||||
return None
|
|
||||||
|
|
||||||
|
|
||||||
def _path_to_address(device_path: str) -> str:
|
|
||||||
parts = device_path.split("/")
|
|
||||||
if len(parts) >= 5 and parts[-1].startswith("dev_"):
|
|
||||||
return parts[-1][4:].replace("_", ":")
|
|
||||||
return device_path
|
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
|
||||||
class SPPDataEvent:
|
|
||||||
"""A chunk of data received from a remote SPP peer."""
|
|
||||||
|
|
||||||
index: int
|
|
||||||
timestamp: str
|
|
||||||
address: str
|
|
||||||
value: bytes
|
|
||||||
value_hex: str
|
|
||||||
value_string: str | None
|
|
||||||
|
|
||||||
def to_dict(self) -> dict[str, Any]:
|
|
||||||
d: dict[str, Any] = {
|
|
||||||
"index": self.index,
|
|
||||||
"timestamp": self.timestamp,
|
|
||||||
"address": self.address,
|
|
||||||
"value_hex": self.value_hex,
|
|
||||||
"length": len(self.value),
|
|
||||||
}
|
|
||||||
if self.value_string is not None:
|
|
||||||
d["value_string"] = self.value_string
|
|
||||||
return d
|
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
|
||||||
class SPPConnection:
|
|
||||||
"""Per-peer connection state for an SPP session."""
|
|
||||||
|
|
||||||
device_path: str
|
|
||||||
address: str
|
|
||||||
role: str # "server" (they connected to us) or "client" (we connected to them)
|
|
||||||
fd: int
|
|
||||||
connected_at: float = field(default_factory=time.monotonic)
|
|
||||||
sock: socket.socket | None = None
|
|
||||||
reader: asyncio.StreamReader | None = None
|
|
||||||
writer: asyncio.StreamWriter | None = None
|
|
||||||
bytes_sent: int = 0
|
|
||||||
bytes_received: int = 0
|
|
||||||
_read_task: asyncio.Task | None = field(default=None, repr=False)
|
|
||||||
|
|
||||||
|
|
||||||
class SPPProfile(ServiceInterface):
|
|
||||||
"""D-Bus Profile1 service for SPP (Serial Port Profile)."""
|
|
||||||
|
|
||||||
def __init__(self) -> None:
|
|
||||||
super().__init__("org.bluez.Profile1")
|
|
||||||
self.connections: dict[str, SPPConnection] = {} # keyed by device_path
|
|
||||||
self._recv_events: deque[SPPDataEvent] = deque(maxlen=500)
|
|
||||||
self._recv_index: int = 0
|
|
||||||
self._recv_callback: Any = None # async callable(address, data)
|
|
||||||
|
|
||||||
@method()
|
|
||||||
def Release(self) -> None:
|
|
||||||
log.info("SPP profile released")
|
|
||||||
for conn in list(self.connections.values()):
|
|
||||||
self._cleanup_connection(conn)
|
|
||||||
self.connections.clear()
|
|
||||||
|
|
||||||
@method()
|
|
||||||
def NewConnection(self, device: "o", fd: "h", properties: "a{sv}") -> None:
|
|
||||||
address = _path_to_address(device)
|
|
||||||
log.debug("SPP NewConnection: device=%s fd=%r props=%s", device, fd, properties)
|
|
||||||
|
|
||||||
if fd is None or (isinstance(fd, int) and fd < 0):
|
|
||||||
log.error("SPP: invalid fd received: %r", fd)
|
|
||||||
return
|
|
||||||
|
|
||||||
log.info("SPP: NewConnection from %s (fd=%d)", address, fd)
|
|
||||||
|
|
||||||
# Duplicate the fd so we own it independent of dbus-fast
|
|
||||||
try:
|
|
||||||
new_fd = os.dup(fd)
|
|
||||||
log.debug("os.dup(%d) -> %d", fd, new_fd)
|
|
||||||
except OSError:
|
|
||||||
log.exception("SPP: os.dup(%d) failed for %s", fd, address)
|
|
||||||
return
|
|
||||||
|
|
||||||
conn = SPPConnection(
|
|
||||||
device_path=device,
|
|
||||||
address=address,
|
|
||||||
role="server",
|
|
||||||
fd=new_fd,
|
|
||||||
)
|
|
||||||
self.connections[device] = conn
|
|
||||||
|
|
||||||
loop = asyncio.get_event_loop()
|
|
||||||
conn._read_task = loop.create_task(self._handle_connection(conn))
|
|
||||||
log.debug("SPP NewConnection done, read task created for %s", address)
|
|
||||||
|
|
||||||
@method()
|
|
||||||
def RequestDisconnection(self, device: "o") -> None:
|
|
||||||
address = _path_to_address(device)
|
|
||||||
log.info("SPP: disconnect requested for %s", address)
|
|
||||||
conn = self.connections.pop(device, None)
|
|
||||||
if conn:
|
|
||||||
self._cleanup_connection(conn)
|
|
||||||
|
|
||||||
def _cleanup_connection(self, conn: SPPConnection) -> None:
|
|
||||||
if conn._read_task and not conn._read_task.done():
|
|
||||||
conn._read_task.cancel()
|
|
||||||
if conn.writer:
|
|
||||||
try:
|
|
||||||
conn.writer.close()
|
|
||||||
except Exception:
|
|
||||||
pass
|
|
||||||
if conn.sock:
|
|
||||||
try:
|
|
||||||
conn.sock.close()
|
|
||||||
except Exception:
|
|
||||||
pass
|
|
||||||
elif conn.fd >= 0:
|
|
||||||
try:
|
|
||||||
os.close(conn.fd)
|
|
||||||
except Exception:
|
|
||||||
pass
|
|
||||||
|
|
||||||
async def _handle_connection(self, conn: SPPConnection) -> None:
|
|
||||||
"""Read loop for raw bytes from the remote SPP peer."""
|
|
||||||
try:
|
|
||||||
log.debug("SPP _handle_connection start: addr=%s fd=%d", conn.address, conn.fd)
|
|
||||||
|
|
||||||
# socket.fromfd() dups the fd internally — close our intermediate copy
|
|
||||||
conn.sock = socket.fromfd(
|
|
||||||
conn.fd, _AF_BLUETOOTH, socket.SOCK_STREAM,
|
|
||||||
_BTPROTO_RFCOMM,
|
|
||||||
)
|
|
||||||
log.debug("socket.fromfd OK: fileno=%d", conn.sock.fileno())
|
|
||||||
try:
|
|
||||||
os.close(conn.fd)
|
|
||||||
except OSError:
|
|
||||||
pass
|
|
||||||
conn.fd = -1 # transferred to socket
|
|
||||||
|
|
||||||
conn.sock.setblocking(False)
|
|
||||||
conn.reader, conn.writer = await asyncio.open_connection(sock=conn.sock)
|
|
||||||
log.debug("SPP async streams ready for %s, entering read loop", conn.address)
|
|
||||||
|
|
||||||
while True:
|
|
||||||
data = await conn.reader.read(4096)
|
|
||||||
if not data:
|
|
||||||
log.debug("SPP: EOF from %s (clean disconnect)", conn.address)
|
|
||||||
break
|
|
||||||
log.debug("SPP recv %d bytes from %s: %r", len(data), conn.address, data[:80])
|
|
||||||
conn.bytes_received += len(data)
|
|
||||||
|
|
||||||
event = SPPDataEvent(
|
|
||||||
index=self._recv_index,
|
|
||||||
timestamp=datetime.now(UTC).isoformat(),
|
|
||||||
address=conn.address,
|
|
||||||
value=data,
|
|
||||||
value_hex=data.hex(),
|
|
||||||
value_string=_try_decode(data),
|
|
||||||
)
|
|
||||||
self._recv_events.append(event)
|
|
||||||
self._recv_index += 1
|
|
||||||
|
|
||||||
# Fire async callback if registered (auto-responders)
|
|
||||||
if self._recv_callback:
|
|
||||||
try:
|
|
||||||
loop = asyncio.get_running_loop()
|
|
||||||
loop.create_task(self._recv_callback(conn.address, data))
|
|
||||||
except RuntimeError:
|
|
||||||
pass
|
|
||||||
|
|
||||||
except (ConnectionResetError, BrokenPipeError, OSError) as e:
|
|
||||||
log.debug("SPP connection error for %s: %s: %s", conn.address, type(e).__name__, e)
|
|
||||||
except asyncio.CancelledError:
|
|
||||||
log.debug("SPP task cancelled for %s", conn.address)
|
|
||||||
except Exception:
|
|
||||||
log.exception("SPP UNEXPECTED error for %s", conn.address)
|
|
||||||
finally:
|
|
||||||
log.debug("SPP cleanup for %s", conn.address)
|
|
||||||
self.connections.pop(conn.device_path, None)
|
|
||||||
self._cleanup_connection(conn)
|
|
||||||
|
|
||||||
async def send(self, address: str, data: bytes) -> bool:
|
|
||||||
"""Send raw bytes to a connected SPP peer."""
|
|
||||||
conn = self._get_connection(address)
|
|
||||||
if not conn or not conn.writer or conn.writer.is_closing():
|
|
||||||
return False
|
|
||||||
conn.writer.write(data)
|
|
||||||
await conn.writer.drain()
|
|
||||||
conn.bytes_sent += len(data)
|
|
||||||
log.debug("SPP sent %d bytes to %s", len(data), address)
|
|
||||||
return True
|
|
||||||
|
|
||||||
def add_client_connection(self, device_path: str, address: str, fd: int) -> None:
|
|
||||||
"""Register a client-initiated connection (we connected to them).
|
|
||||||
|
|
||||||
Called after Device1.ConnectProfile triggers NewConnection on our
|
|
||||||
Profile1 handler. In client mode, BlueZ still delivers the fd via
|
|
||||||
NewConnection — we just tag the role as "client" before the read
|
|
||||||
loop starts. However, BlueZ may have already fired NewConnection
|
|
||||||
by the time ConnectProfile returns, so this is a fallback to
|
|
||||||
retag if needed.
|
|
||||||
"""
|
|
||||||
conn = self.connections.get(device_path)
|
|
||||||
if conn:
|
|
||||||
conn.role = "client"
|
|
||||||
|
|
||||||
def _get_connection(self, address: str) -> SPPConnection | None:
|
|
||||||
for conn in self.connections.values():
|
|
||||||
if conn.address.upper() == address.upper():
|
|
||||||
return conn
|
|
||||||
return None
|
|
||||||
|
|
||||||
def get_recv_events(
|
|
||||||
self,
|
|
||||||
since_index: int = 0,
|
|
||||||
address: str | None = None,
|
|
||||||
limit: int = 50,
|
|
||||||
) -> list[dict[str, Any]]:
|
|
||||||
"""Get received data events, optionally filtered."""
|
|
||||||
events = [
|
|
||||||
e
|
|
||||||
for e in self._recv_events
|
|
||||||
if e.index >= since_index
|
|
||||||
and (address is None or e.address.upper() == address.upper())
|
|
||||||
]
|
|
||||||
return [e.to_dict() for e in events[:limit]]
|
|
||||||
|
|
||||||
def clear_recv_events(self) -> int:
|
|
||||||
"""Clear all received data events. Returns count cleared."""
|
|
||||||
count = len(self._recv_events)
|
|
||||||
self._recv_events.clear()
|
|
||||||
return count
|
|
||||||
|
|
||||||
def get_status(self) -> dict[str, Any]:
|
|
||||||
"""Get overall SPP status."""
|
|
||||||
conns = []
|
|
||||||
now = time.monotonic()
|
|
||||||
for conn in self.connections.values():
|
|
||||||
conns.append({
|
|
||||||
"address": conn.address,
|
|
||||||
"role": conn.role,
|
|
||||||
"duration_seconds": round(now - conn.connected_at, 1),
|
|
||||||
"bytes_sent": conn.bytes_sent,
|
|
||||||
"bytes_received": conn.bytes_received,
|
|
||||||
})
|
|
||||||
return {
|
|
||||||
"registered": _profile_registered,
|
|
||||||
"uuid": _registered_uuid or SPP_UUID,
|
|
||||||
"connections": conns,
|
|
||||||
"recv_buffer_count": len(self._recv_events),
|
|
||||||
"recv_buffer_total": self._recv_index,
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
# ==================== Module-level lifecycle ====================
|
|
||||||
|
|
||||||
_profile: SPPProfile | None = None
|
|
||||||
_profile_bus: MessageBus | None = None
|
|
||||||
_profile_registered: bool = False
|
|
||||||
_registered_uuid: str | None = None
|
|
||||||
|
|
||||||
|
|
||||||
async def enable_spp(
|
|
||||||
uuid: str = SPP_UUID,
|
|
||||||
channel: int = 0,
|
|
||||||
name: str = "mcbluetooth SPP",
|
|
||||||
) -> SPPProfile:
|
|
||||||
"""Register the SPP profile with BlueZ.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
uuid: Service UUID. Default is standard SPP UUID. Use custom UUIDs
|
|
||||||
for Arduino/ESP32 devices that advertise non-standard RFCOMM.
|
|
||||||
channel: RFCOMM channel number (0 = auto-assign).
|
|
||||||
name: Profile display name.
|
|
||||||
"""
|
|
||||||
global _profile, _profile_bus, _profile_registered, _registered_uuid
|
|
||||||
|
|
||||||
if _profile_registered and _profile:
|
|
||||||
return _profile
|
|
||||||
|
|
||||||
if _profile is None:
|
|
||||||
_profile = SPPProfile()
|
|
||||||
|
|
||||||
if _profile_bus is None:
|
|
||||||
_profile_bus = await MessageBus(
|
|
||||||
bus_type=BusType.SYSTEM,
|
|
||||||
negotiate_unix_fd=True, # Required: BlueZ passes RFCOMM fd via D-Bus
|
|
||||||
).connect()
|
|
||||||
log.debug("SPP D-Bus connected: negotiate_unix_fd=%s unique_name=%s",
|
|
||||||
_profile_bus._negotiate_unix_fd, _profile_bus.unique_name)
|
|
||||||
_profile_bus.export(SPP_PROFILE_PATH, _profile)
|
|
||||||
|
|
||||||
# Register with ProfileManager1
|
|
||||||
introspection = await _profile_bus.introspect(BLUEZ_SERVICE, "/org/bluez")
|
|
||||||
proxy = _profile_bus.get_proxy_object(BLUEZ_SERVICE, "/org/bluez", introspection)
|
|
||||||
profile_mgr = proxy.get_interface(PROFILE_MANAGER_IFACE)
|
|
||||||
|
|
||||||
options: dict[str, Variant] = {
|
|
||||||
"Name": Variant("s", name),
|
|
||||||
"Role": Variant("s", "client-server"),
|
|
||||||
}
|
|
||||||
if channel > 0:
|
|
||||||
options["Channel"] = Variant("q", channel)
|
|
||||||
|
|
||||||
try:
|
|
||||||
await profile_mgr.call_register_profile(
|
|
||||||
SPP_PROFILE_PATH,
|
|
||||||
uuid,
|
|
||||||
options,
|
|
||||||
)
|
|
||||||
_profile_registered = True
|
|
||||||
_registered_uuid = uuid
|
|
||||||
log.info("SPP profile registered with BlueZ (uuid=%s, channel=%s)", uuid, channel or "auto")
|
|
||||||
except Exception as e:
|
|
||||||
if "Already Exists" in str(e):
|
|
||||||
log.info("SPP profile stale — unregistering and re-registering")
|
|
||||||
try:
|
|
||||||
await profile_mgr.call_unregister_profile(SPP_PROFILE_PATH)
|
|
||||||
except Exception:
|
|
||||||
pass
|
|
||||||
await profile_mgr.call_register_profile(
|
|
||||||
SPP_PROFILE_PATH,
|
|
||||||
uuid,
|
|
||||||
options,
|
|
||||||
)
|
|
||||||
_profile_registered = True
|
|
||||||
_registered_uuid = uuid
|
|
||||||
log.info("SPP profile re-registered with BlueZ")
|
|
||||||
else:
|
|
||||||
raise
|
|
||||||
|
|
||||||
return _profile
|
|
||||||
|
|
||||||
|
|
||||||
async def disable_spp() -> None:
|
|
||||||
"""Unregister the SPP profile and close all connections."""
|
|
||||||
global _profile, _profile_bus, _profile_registered, _registered_uuid
|
|
||||||
|
|
||||||
if _profile_bus and _profile_registered:
|
|
||||||
try:
|
|
||||||
introspection = await _profile_bus.introspect(BLUEZ_SERVICE, "/org/bluez")
|
|
||||||
proxy = _profile_bus.get_proxy_object(BLUEZ_SERVICE, "/org/bluez", introspection)
|
|
||||||
profile_mgr = proxy.get_interface(PROFILE_MANAGER_IFACE)
|
|
||||||
await profile_mgr.call_unregister_profile(SPP_PROFILE_PATH)
|
|
||||||
except Exception:
|
|
||||||
pass
|
|
||||||
_profile_registered = False
|
|
||||||
_registered_uuid = None
|
|
||||||
|
|
||||||
if _profile:
|
|
||||||
_profile.Release()
|
|
||||||
_profile = None
|
|
||||||
|
|
||||||
if _profile_bus:
|
|
||||||
_profile_bus.disconnect()
|
|
||||||
_profile_bus = None
|
|
||||||
|
|
||||||
|
|
||||||
async def get_spp() -> SPPProfile | None:
|
|
||||||
"""Get the current SPP profile instance (None if not enabled)."""
|
|
||||||
return _profile
|
|
||||||
@ -1,681 +0,0 @@
|
|||||||
"""ELM327 OBD-II BLE Emulator.
|
|
||||||
|
|
||||||
Emulates an ELM327 v1.5 adapter over BLE using Nordic UART Service (NUS).
|
|
||||||
OBD-II apps (Torque, Car Scanner, etc.) connect via BLE, send AT commands
|
|
||||||
and OBD-II PID queries, and receive simulated responses.
|
|
||||||
|
|
||||||
The emulator handles the full ELM327 protocol:
|
|
||||||
- AT command set (ATZ, ATE0, ATH1, ATSP, etc.)
|
|
||||||
- Mode 01 (live data) with configurable PID values
|
|
||||||
- Mode 03 (stored DTCs)
|
|
||||||
- Mode 09 (VIN query)
|
|
||||||
- Supported PID bitmaps auto-generated from configured PIDs
|
|
||||||
|
|
||||||
NUS UUIDs (same as real BLE OBD adapters):
|
|
||||||
Service: 6E400001-B5A3-F393-E0A9-E50E24DCCA9E
|
|
||||||
RX (write): 6E400002-... (client writes commands here)
|
|
||||||
TX (notify): 6E400003-... (server sends responses here)
|
|
||||||
"""
|
|
||||||
|
|
||||||
from __future__ import annotations
|
|
||||||
|
|
||||||
import logging
|
|
||||||
from collections import deque
|
|
||||||
from dataclasses import dataclass
|
|
||||||
from datetime import UTC, datetime
|
|
||||||
from typing import Any
|
|
||||||
|
|
||||||
from fastmcp import FastMCP
|
|
||||||
|
|
||||||
from mcbluetooth.gatt_server import get_gatt_server
|
|
||||||
|
|
||||||
log = logging.getLogger(__name__)
|
|
||||||
|
|
||||||
# Nordic UART Service UUIDs
|
|
||||||
NUS_SERVICE_UUID = "6e400001-b5a3-f393-e0a9-e50e24dcca9e"
|
|
||||||
NUS_RX_CHAR_UUID = "6e400002-b5a3-f393-e0a9-e50e24dcca9e" # Client writes here
|
|
||||||
NUS_TX_CHAR_UUID = "6e400003-b5a3-f393-e0a9-e50e24dcca9e" # Server notifies here
|
|
||||||
|
|
||||||
# Default ECU header (primary ECU response address)
|
|
||||||
ECU_HEADER = "7E8"
|
|
||||||
|
|
||||||
# Common OBD-II PIDs with default values
|
|
||||||
DEFAULT_PIDS: dict[int, tuple[bytes, str]] = {
|
|
||||||
0x05: (bytes([70]), "Coolant temp (°C, offset -40) → 30°C"),
|
|
||||||
0x0C: (bytes([0x0B, 0xB8]), "Engine RPM (× ¼) → 750 RPM"),
|
|
||||||
0x0D: (bytes([0]), "Vehicle speed (km/h) → 0"),
|
|
||||||
0x0F: (bytes([55]), "Intake air temp (°C, offset -40) → 15°C"),
|
|
||||||
0x11: (bytes([25]), "Throttle position (%) → ~10%"),
|
|
||||||
0x1F: (bytes([0x00, 0x00]), "Runtime since start (sec) → 0"),
|
|
||||||
0x2F: (bytes([51]), "Fuel tank level (%) → 20%"),
|
|
||||||
0x46: (bytes([60]), "Ambient air temp (°C, offset -40) → 20°C"),
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
|
||||||
class CommandRecord:
|
|
||||||
"""A command received from the OBD-II client."""
|
|
||||||
|
|
||||||
index: int
|
|
||||||
timestamp: str
|
|
||||||
raw: str
|
|
||||||
parsed: str
|
|
||||||
response: str
|
|
||||||
|
|
||||||
|
|
||||||
class ELM327Emulator:
|
|
||||||
"""ELM327 AT command and OBD-II protocol handler."""
|
|
||||||
|
|
||||||
def __init__(self):
|
|
||||||
self.device_id = "ELM327 v1.5"
|
|
||||||
self.protocol = "6" # ISO 15765-4 CAN (11/500)
|
|
||||||
self.echo = False
|
|
||||||
self.headers = True
|
|
||||||
self.spaces = False
|
|
||||||
self.linefeed = False
|
|
||||||
self.pid_values: dict[int, bytes] = {}
|
|
||||||
self.dtc_codes: list[str] = []
|
|
||||||
self.vin = "1MCBT00TESTING12345"
|
|
||||||
self._commands: deque[CommandRecord] = deque(maxlen=200)
|
|
||||||
self._cmd_index = 0
|
|
||||||
self._started = False
|
|
||||||
self._rx_char_id: str | None = None
|
|
||||||
self._tx_char_id: str | None = None
|
|
||||||
self._adapter: str | None = None
|
|
||||||
self._buf = "" # Accumulates partial writes
|
|
||||||
|
|
||||||
def set_defaults(self) -> None:
|
|
||||||
"""Load default PID values for a realistic idle vehicle."""
|
|
||||||
for pid, (value, _desc) in DEFAULT_PIDS.items():
|
|
||||||
self.pid_values[pid] = value
|
|
||||||
|
|
||||||
def handle_data(self, data: bytes) -> str | None:
|
|
||||||
"""Process incoming bytes, return response when a complete command is received.
|
|
||||||
|
|
||||||
ELM327 commands are terminated with CR (\\r). Accumulates partial
|
|
||||||
writes until a complete command is received.
|
|
||||||
"""
|
|
||||||
try:
|
|
||||||
text = data.decode("utf-8", errors="replace")
|
|
||||||
except Exception:
|
|
||||||
return None
|
|
||||||
|
|
||||||
self._buf += text
|
|
||||||
|
|
||||||
# Check for command terminator
|
|
||||||
if "\r" not in self._buf:
|
|
||||||
return None
|
|
||||||
|
|
||||||
# Split on CR, process first complete command
|
|
||||||
line, self._buf = self._buf.split("\r", 1)
|
|
||||||
line = line.strip()
|
|
||||||
if not line:
|
|
||||||
return None
|
|
||||||
|
|
||||||
response = self._process_command(line)
|
|
||||||
|
|
||||||
# Record command
|
|
||||||
record = CommandRecord(
|
|
||||||
index=self._cmd_index,
|
|
||||||
timestamp=datetime.now(UTC).isoformat(),
|
|
||||||
raw=line,
|
|
||||||
parsed=line.upper().strip(),
|
|
||||||
response=response.replace("\r", "\\r").replace("\n", "\\n"),
|
|
||||||
)
|
|
||||||
self._commands.append(record)
|
|
||||||
self._cmd_index += 1
|
|
||||||
|
|
||||||
return response
|
|
||||||
|
|
||||||
def _process_command(self, line: str) -> str:
|
|
||||||
"""Parse and handle an AT or OBD command."""
|
|
||||||
cmd = line.strip().upper()
|
|
||||||
|
|
||||||
if cmd.startswith("AT"):
|
|
||||||
return self._handle_at(cmd)
|
|
||||||
|
|
||||||
# OBD-II query
|
|
||||||
return self._handle_obd(cmd)
|
|
||||||
|
|
||||||
def _handle_at(self, cmd: str) -> str:
|
|
||||||
"""Handle ELM327 AT commands."""
|
|
||||||
cr = "\r\n" if self.linefeed else "\r"
|
|
||||||
|
|
||||||
if cmd == "ATZ":
|
|
||||||
self.echo = False
|
|
||||||
self.headers = True
|
|
||||||
self.spaces = False
|
|
||||||
return f"{cr}{self.device_id}{cr}{cr}>"
|
|
||||||
|
|
||||||
if cmd == "ATI":
|
|
||||||
return f"{self.device_id}{cr}{cr}>"
|
|
||||||
|
|
||||||
if cmd == "AT@1":
|
|
||||||
return f"mcbluetooth OBD-II Emulator{cr}{cr}>"
|
|
||||||
|
|
||||||
if cmd == "ATE0":
|
|
||||||
self.echo = False
|
|
||||||
return f"OK{cr}{cr}>"
|
|
||||||
|
|
||||||
if cmd == "ATE1":
|
|
||||||
self.echo = True
|
|
||||||
return f"OK{cr}{cr}>"
|
|
||||||
|
|
||||||
if cmd == "ATH0":
|
|
||||||
self.headers = False
|
|
||||||
return f"OK{cr}{cr}>"
|
|
||||||
|
|
||||||
if cmd == "ATH1":
|
|
||||||
self.headers = True
|
|
||||||
return f"OK{cr}{cr}>"
|
|
||||||
|
|
||||||
if cmd == "ATS0":
|
|
||||||
self.spaces = False
|
|
||||||
return f"OK{cr}{cr}>"
|
|
||||||
|
|
||||||
if cmd == "ATS1":
|
|
||||||
self.spaces = True
|
|
||||||
return f"OK{cr}{cr}>"
|
|
||||||
|
|
||||||
if cmd.startswith("ATL"):
|
|
||||||
self.linefeed = cmd.endswith("1")
|
|
||||||
cr = "\r\n" if self.linefeed else "\r"
|
|
||||||
return f"OK{cr}{cr}>"
|
|
||||||
|
|
||||||
if cmd == "ATDPN":
|
|
||||||
return f"A{self.protocol}{cr}{cr}>"
|
|
||||||
|
|
||||||
if cmd == "ATDP":
|
|
||||||
protocols = {
|
|
||||||
"0": "AUTO",
|
|
||||||
"6": "ISO 15765-4 (CAN 11/500)",
|
|
||||||
"7": "ISO 15765-4 (CAN 29/500)",
|
|
||||||
"8": "ISO 15765-4 (CAN 11/250)",
|
|
||||||
"9": "ISO 15765-4 (CAN 29/250)",
|
|
||||||
}
|
|
||||||
name = protocols.get(self.protocol, "AUTO")
|
|
||||||
return f"{name}{cr}{cr}>"
|
|
||||||
|
|
||||||
if cmd == "ATRV":
|
|
||||||
return f"12.6V{cr}{cr}>"
|
|
||||||
|
|
||||||
# Commands that just get OK
|
|
||||||
ok_prefixes = (
|
|
||||||
"ATSP",
|
|
||||||
"ATST",
|
|
||||||
"ATAT",
|
|
||||||
"ATM",
|
|
||||||
"ATCAF",
|
|
||||||
"ATD",
|
|
||||||
"ATCE",
|
|
||||||
"ATCF",
|
|
||||||
"ATCM",
|
|
||||||
"ATSH",
|
|
||||||
"ATFC",
|
|
||||||
"ATWM",
|
|
||||||
"ATMA",
|
|
||||||
"ATAR",
|
|
||||||
"ATPC",
|
|
||||||
)
|
|
||||||
for prefix in ok_prefixes:
|
|
||||||
if cmd.startswith(prefix):
|
|
||||||
return f"OK{cr}{cr}>"
|
|
||||||
|
|
||||||
if cmd == "AT":
|
|
||||||
return f"OK{cr}{cr}>"
|
|
||||||
|
|
||||||
return f"?{cr}{cr}>"
|
|
||||||
|
|
||||||
def _handle_obd(self, cmd: str) -> str:
|
|
||||||
"""Handle OBD-II service queries."""
|
|
||||||
cr = "\r\n" if self.linefeed else "\r"
|
|
||||||
cmd = cmd.replace(" ", "")
|
|
||||||
|
|
||||||
# Modes 03/04 don't require a PID — pad to 4 chars
|
|
||||||
if len(cmd) == 2:
|
|
||||||
cmd += "00"
|
|
||||||
elif len(cmd) < 4:
|
|
||||||
return f"?{cr}{cr}>"
|
|
||||||
|
|
||||||
try:
|
|
||||||
mode = int(cmd[:2], 16)
|
|
||||||
pid = int(cmd[2:4], 16)
|
|
||||||
except ValueError:
|
|
||||||
return f"?{cr}{cr}>"
|
|
||||||
|
|
||||||
if mode == 0x01:
|
|
||||||
return self._handle_mode01(pid) + f"{cr}>"
|
|
||||||
if mode == 0x03:
|
|
||||||
return self._handle_mode03() + f"{cr}>"
|
|
||||||
if mode == 0x04:
|
|
||||||
self.dtc_codes.clear()
|
|
||||||
return f"44{cr}{cr}>"
|
|
||||||
if mode == 0x09:
|
|
||||||
return self._handle_mode09(pid) + f"{cr}>"
|
|
||||||
|
|
||||||
return f"NO DATA{cr}{cr}>"
|
|
||||||
|
|
||||||
def _handle_mode01(self, pid: int) -> str:
|
|
||||||
"""Handle Mode 01 (current data) queries."""
|
|
||||||
cr = "\r\n" if self.linefeed else "\r"
|
|
||||||
|
|
||||||
# Supported PID bitmaps
|
|
||||||
if pid in (0x00, 0x20, 0x40, 0x60, 0x80, 0xA0, 0xC0, 0xE0):
|
|
||||||
bitmap = self._build_pid_bitmap(pid)
|
|
||||||
return self._format_response(0x41, pid, bitmap)
|
|
||||||
|
|
||||||
# Configured PID value
|
|
||||||
if pid in self.pid_values:
|
|
||||||
return self._format_response(0x41, pid, self.pid_values[pid])
|
|
||||||
|
|
||||||
return f"NO DATA{cr}"
|
|
||||||
|
|
||||||
def _handle_mode03(self) -> str:
|
|
||||||
"""Handle Mode 03 (stored DTCs) query."""
|
|
||||||
if not self.dtc_codes:
|
|
||||||
return self._format_response(0x43, 0x00, bytes([0x00, 0x00, 0x00, 0x00]))
|
|
||||||
|
|
||||||
# Pack DTCs into response (2 bytes each)
|
|
||||||
dtc_bytes = bytearray()
|
|
||||||
for code in self.dtc_codes[:3]: # Max 3 per frame
|
|
||||||
dtc_bytes.extend(self._encode_dtc(code))
|
|
||||||
|
|
||||||
# Pad to even number of DTCs
|
|
||||||
while len(dtc_bytes) % 4 != 0:
|
|
||||||
dtc_bytes.extend(b"\x00\x00")
|
|
||||||
|
|
||||||
return self._format_response(0x43, len(self.dtc_codes), bytes(dtc_bytes))
|
|
||||||
|
|
||||||
def _handle_mode09(self, pid: int) -> str:
|
|
||||||
"""Handle Mode 09 (vehicle info) queries."""
|
|
||||||
cr = "\r\n" if self.linefeed else "\r"
|
|
||||||
|
|
||||||
if pid == 0x00:
|
|
||||||
# Supported Mode 09 PIDs: 02 (VIN)
|
|
||||||
return self._format_response(0x49, 0x00, bytes([0x40, 0x00, 0x00, 0x00]))
|
|
||||||
|
|
||||||
if pid == 0x02:
|
|
||||||
# VIN (17 chars, multi-frame)
|
|
||||||
vin_bytes = self.vin.encode("ascii")[:17].ljust(17, b"0")
|
|
||||||
if self.headers:
|
|
||||||
lines = []
|
|
||||||
# First frame: 10 14 49 02 01 VIN[0:3]
|
|
||||||
lines.append(
|
|
||||||
f"{ECU_HEADER} 10 14 49 02 01 " + " ".join(f"{b:02X}" for b in vin_bytes[:3])
|
|
||||||
)
|
|
||||||
# Consecutive frames
|
|
||||||
offset = 3
|
|
||||||
seq = 1
|
|
||||||
while offset < len(vin_bytes):
|
|
||||||
chunk = vin_bytes[offset : offset + 7]
|
|
||||||
frame = f"{ECU_HEADER} 2{seq:01X} " + " ".join(f"{b:02X}" for b in chunk)
|
|
||||||
lines.append(frame)
|
|
||||||
offset += 7
|
|
||||||
seq = (seq + 1) & 0x0F
|
|
||||||
return cr.join(lines) + cr
|
|
||||||
else:
|
|
||||||
return f"4902{vin_bytes.hex().upper()}{cr}"
|
|
||||||
|
|
||||||
return f"NO DATA{cr}"
|
|
||||||
|
|
||||||
def _format_response(self, mode_resp: int, pid: int, data: bytes) -> str:
|
|
||||||
"""Format an OBD-II response with optional headers."""
|
|
||||||
cr = "\r\n" if self.linefeed else "\r"
|
|
||||||
length = 2 + len(data) # mode + pid + data
|
|
||||||
|
|
||||||
if self.headers:
|
|
||||||
sep = " " if self.spaces else ""
|
|
||||||
data_hex = sep.join(f"{b:02X}" for b in data)
|
|
||||||
return f"{ECU_HEADER}{sep}{length:02X}{sep}{mode_resp:02X}{sep}{pid:02X}{sep}{data_hex}{cr}"
|
|
||||||
else:
|
|
||||||
data_hex = "".join(f"{b:02X}" for b in data)
|
|
||||||
return f"{mode_resp:02X}{pid:02X}{data_hex}{cr}"
|
|
||||||
|
|
||||||
def _build_pid_bitmap(self, base_pid: int) -> bytes:
|
|
||||||
"""Build a 4-byte bitmap of supported PIDs for a range."""
|
|
||||||
bitmap = 0
|
|
||||||
all_pids = set(self.pid_values.keys())
|
|
||||||
|
|
||||||
for i in range(32):
|
|
||||||
actual_pid = base_pid + i + 1
|
|
||||||
if actual_pid in all_pids:
|
|
||||||
bitmap |= 1 << (31 - i)
|
|
||||||
|
|
||||||
# Set bit 32 (= bit 0) if the next range has supported PIDs
|
|
||||||
next_base = base_pid + 0x20
|
|
||||||
if next_base <= 0xE0:
|
|
||||||
for p in all_pids:
|
|
||||||
if next_base < p <= next_base + 0x20:
|
|
||||||
bitmap |= 1
|
|
||||||
break
|
|
||||||
|
|
||||||
return bitmap.to_bytes(4, "big")
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def _encode_dtc(code: str) -> bytes:
|
|
||||||
"""Encode a DTC string like 'P0301' into 2 bytes."""
|
|
||||||
prefixes = {"P": 0, "C": 1, "B": 2, "U": 3}
|
|
||||||
prefix = prefixes.get(code[0].upper(), 0)
|
|
||||||
number = int(code[1:], 16)
|
|
||||||
value = (prefix << 14) | number
|
|
||||||
return value.to_bytes(2, "big")
|
|
||||||
|
|
||||||
def get_commands(
|
|
||||||
self,
|
|
||||||
since_index: int = 0,
|
|
||||||
limit: int = 50,
|
|
||||||
) -> list[dict[str, Any]]:
|
|
||||||
"""Get recorded commands."""
|
|
||||||
records = [
|
|
||||||
{
|
|
||||||
"index": r.index,
|
|
||||||
"timestamp": r.timestamp,
|
|
||||||
"command": r.raw,
|
|
||||||
"response_preview": r.response[:80],
|
|
||||||
}
|
|
||||||
for r in self._commands
|
|
||||||
if r.index >= since_index
|
|
||||||
]
|
|
||||||
return records[:limit]
|
|
||||||
|
|
||||||
|
|
||||||
# ==================== Module-level emulator ====================
|
|
||||||
|
|
||||||
_emulator: ELM327Emulator | None = None
|
|
||||||
|
|
||||||
|
|
||||||
def _get_emulator() -> ELM327Emulator:
|
|
||||||
"""Get the global emulator instance."""
|
|
||||||
global _emulator
|
|
||||||
if _emulator is None:
|
|
||||||
_emulator = ELM327Emulator()
|
|
||||||
return _emulator
|
|
||||||
|
|
||||||
|
|
||||||
async def _on_write(char_id: str, value: bytes, device: str | None) -> None:
|
|
||||||
"""Callback fired when a remote client writes to the RX characteristic."""
|
|
||||||
emu = _get_emulator()
|
|
||||||
if not emu._started or char_id != emu._rx_char_id:
|
|
||||||
return
|
|
||||||
|
|
||||||
response = emu.handle_data(value)
|
|
||||||
if response and emu._tx_char_id:
|
|
||||||
mgr = get_gatt_server()
|
|
||||||
mgr.set_value(emu._tx_char_id, response.encode("utf-8"))
|
|
||||||
|
|
||||||
|
|
||||||
# ==================== Tool registration ====================
|
|
||||||
|
|
||||||
|
|
||||||
def register_tools(mcp: FastMCP) -> None:
|
|
||||||
"""Register ELM327 emulator tools with the MCP server."""
|
|
||||||
|
|
||||||
@mcp.tool()
|
|
||||||
async def bt_elm327_emu_start(
|
|
||||||
adapter: str,
|
|
||||||
name: str = "OBDII",
|
|
||||||
load_defaults: bool = True,
|
|
||||||
) -> dict[str, Any]:
|
|
||||||
"""Start the ELM327 OBD-II BLE emulator.
|
|
||||||
|
|
||||||
Sets up a Nordic UART Service (NUS) GATT server, registers with
|
|
||||||
BlueZ, and starts advertising. OBD-II apps can then discover and
|
|
||||||
connect to this emulated adapter.
|
|
||||||
|
|
||||||
The emulator auto-responds to AT commands and OBD-II PID queries.
|
|
||||||
Configure PID values with bt_elm327_emu_set_pid().
|
|
||||||
|
|
||||||
Args:
|
|
||||||
adapter: Bluetooth adapter (e.g., "hci0").
|
|
||||||
name: BLE device name shown to scanners (default "OBDII").
|
|
||||||
load_defaults: Pre-load default PID values for a realistic
|
|
||||||
idle vehicle (RPM 750, coolant 30°C, etc.).
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
Status with NUS characteristic IDs and default PID info.
|
|
||||||
"""
|
|
||||||
emu = _get_emulator()
|
|
||||||
if emu._started:
|
|
||||||
return {"status": "error", "error": "Emulator already running"}
|
|
||||||
|
|
||||||
try:
|
|
||||||
if load_defaults:
|
|
||||||
emu.set_defaults()
|
|
||||||
|
|
||||||
mgr = get_gatt_server()
|
|
||||||
|
|
||||||
# Build NUS GATT hierarchy
|
|
||||||
svc_id = mgr.add_service(NUS_SERVICE_UUID)
|
|
||||||
rx_id = mgr.add_characteristic(
|
|
||||||
svc_id,
|
|
||||||
NUS_RX_CHAR_UUID,
|
|
||||||
flags=["write", "write-without-response"],
|
|
||||||
)
|
|
||||||
tx_id = mgr.add_characteristic(
|
|
||||||
svc_id,
|
|
||||||
NUS_TX_CHAR_UUID,
|
|
||||||
flags=["notify"],
|
|
||||||
value=f"{emu.device_id}\r\n>".encode(),
|
|
||||||
)
|
|
||||||
|
|
||||||
emu._rx_char_id = rx_id
|
|
||||||
emu._tx_char_id = tx_id
|
|
||||||
emu._adapter = adapter
|
|
||||||
|
|
||||||
# Register write callback for auto-response
|
|
||||||
mgr._write_callback = _on_write
|
|
||||||
|
|
||||||
# Register + advertise
|
|
||||||
await mgr.register(adapter)
|
|
||||||
await mgr.set_advertising(
|
|
||||||
adapter,
|
|
||||||
True,
|
|
||||||
name=name,
|
|
||||||
service_uuids=[NUS_SERVICE_UUID],
|
|
||||||
)
|
|
||||||
|
|
||||||
emu._started = True
|
|
||||||
|
|
||||||
pid_info = (
|
|
||||||
{pid: desc for pid, (_val, desc) in DEFAULT_PIDS.items()} if load_defaults else {}
|
|
||||||
)
|
|
||||||
|
|
||||||
return {
|
|
||||||
"status": "ok",
|
|
||||||
"emulator": "ELM327 v1.5",
|
|
||||||
"adapter": adapter,
|
|
||||||
"name": name,
|
|
||||||
"rx_char_id": rx_id,
|
|
||||||
"tx_char_id": tx_id,
|
|
||||||
"default_pids": pid_info,
|
|
||||||
"vin": emu.vin,
|
|
||||||
}
|
|
||||||
except Exception as exc:
|
|
||||||
return {"status": "error", "error": str(exc)}
|
|
||||||
|
|
||||||
@mcp.tool()
|
|
||||||
async def bt_elm327_emu_stop(adapter: str) -> dict[str, Any]:
|
|
||||||
"""Stop the ELM327 OBD-II BLE emulator.
|
|
||||||
|
|
||||||
Stops advertising, unregisters GATT services, and cleans up.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
adapter: Bluetooth adapter (e.g., "hci0").
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
Status confirming shutdown.
|
|
||||||
"""
|
|
||||||
global _emulator
|
|
||||||
emu = _get_emulator()
|
|
||||||
|
|
||||||
if not emu._started:
|
|
||||||
return {"status": "ok", "was_running": False}
|
|
||||||
|
|
||||||
try:
|
|
||||||
mgr = get_gatt_server()
|
|
||||||
mgr._write_callback = None
|
|
||||||
await mgr.unregister(adapter)
|
|
||||||
mgr.clear()
|
|
||||||
except Exception:
|
|
||||||
pass
|
|
||||||
|
|
||||||
emu._started = False
|
|
||||||
_emulator = None
|
|
||||||
return {"status": "ok", "was_running": True}
|
|
||||||
|
|
||||||
@mcp.tool()
|
|
||||||
async def bt_elm327_emu_status() -> dict[str, Any]:
|
|
||||||
"""Get ELM327 emulator status.
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
- running: Whether the emulator is active
|
|
||||||
- pids: Configured PID values with descriptions
|
|
||||||
- dtcs: Active diagnostic trouble codes
|
|
||||||
- command_count: Number of commands received
|
|
||||||
- settings: Current ELM327 settings (echo, headers, etc.)
|
|
||||||
"""
|
|
||||||
emu = _get_emulator()
|
|
||||||
|
|
||||||
pids = {}
|
|
||||||
for pid, value in emu.pid_values.items():
|
|
||||||
desc = ""
|
|
||||||
if pid in DEFAULT_PIDS:
|
|
||||||
_, desc = DEFAULT_PIDS[pid]
|
|
||||||
pids[f"0x{pid:02X}"] = {
|
|
||||||
"value_hex": value.hex(),
|
|
||||||
"value_bytes": list(value),
|
|
||||||
"description": desc,
|
|
||||||
}
|
|
||||||
|
|
||||||
return {
|
|
||||||
"status": "ok",
|
|
||||||
"running": emu._started,
|
|
||||||
"device_id": emu.device_id,
|
|
||||||
"adapter": emu._adapter,
|
|
||||||
"protocol": emu.protocol,
|
|
||||||
"pids": pids,
|
|
||||||
"dtcs": emu.dtc_codes,
|
|
||||||
"vin": emu.vin,
|
|
||||||
"command_count": emu._cmd_index,
|
|
||||||
"settings": {
|
|
||||||
"echo": emu.echo,
|
|
||||||
"headers": emu.headers,
|
|
||||||
"spaces": emu.spaces,
|
|
||||||
"linefeed": emu.linefeed,
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
@mcp.tool()
|
|
||||||
async def bt_elm327_emu_set_pid(
|
|
||||||
pid: int,
|
|
||||||
value: str,
|
|
||||||
value_type: str = "hex",
|
|
||||||
) -> dict[str, Any]:
|
|
||||||
"""Set an OBD-II PID response value.
|
|
||||||
|
|
||||||
When an OBD app queries this PID (Mode 01), the emulator returns
|
|
||||||
the configured value. Values follow OBD-II encoding conventions.
|
|
||||||
|
|
||||||
Common PIDs:
|
|
||||||
0x05: Coolant temp (1 byte, value - 40 = °C)
|
|
||||||
0x0C: Engine RPM (2 bytes, value / 4 = RPM)
|
|
||||||
0x0D: Vehicle speed (1 byte = km/h)
|
|
||||||
0x0F: Intake air temp (1 byte, value - 40 = °C)
|
|
||||||
0x11: Throttle position (1 byte, value / 2.55 = %)
|
|
||||||
0x2F: Fuel tank level (1 byte, value / 2.55 = %)
|
|
||||||
0x46: Ambient air temp (1 byte, value - 40 = °C)
|
|
||||||
|
|
||||||
Args:
|
|
||||||
pid: PID number (e.g., 0x0C for RPM).
|
|
||||||
value: Encoded value. For RPM of 3000: "0BB8" (hex) or "3000" (int).
|
|
||||||
value_type: "hex" (raw bytes), "int" (little-endian integer),
|
|
||||||
or "string" (UTF-8).
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
Status with the configured PID value.
|
|
||||||
"""
|
|
||||||
emu = _get_emulator()
|
|
||||||
|
|
||||||
try:
|
|
||||||
if value_type == "hex":
|
|
||||||
data = bytes.fromhex(value)
|
|
||||||
elif value_type == "int":
|
|
||||||
int_val = int(value)
|
|
||||||
byte_len = max(1, (int_val.bit_length() + 7) // 8)
|
|
||||||
data = int_val.to_bytes(byte_len, "big") # OBD uses big-endian
|
|
||||||
elif value_type == "string":
|
|
||||||
data = value.encode("utf-8")
|
|
||||||
else:
|
|
||||||
return {"status": "error", "error": f"Unknown value_type: {value_type}"}
|
|
||||||
|
|
||||||
emu.pid_values[pid] = data
|
|
||||||
return {
|
|
||||||
"status": "ok",
|
|
||||||
"pid": f"0x{pid:02X}",
|
|
||||||
"value_hex": data.hex(),
|
|
||||||
"value_bytes": list(data),
|
|
||||||
}
|
|
||||||
except Exception as exc:
|
|
||||||
return {"status": "error", "error": str(exc)}
|
|
||||||
|
|
||||||
@mcp.tool()
|
|
||||||
async def bt_elm327_emu_set_dtcs(dtc_codes: list[str]) -> dict[str, Any]:
|
|
||||||
"""Set diagnostic trouble codes for Mode 03 queries.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
dtc_codes: List of DTC strings (e.g., ["P0301", "P0420", "C0035"]).
|
|
||||||
P = Powertrain, C = Chassis, B = Body, U = Network.
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
Status with configured DTCs.
|
|
||||||
"""
|
|
||||||
emu = _get_emulator()
|
|
||||||
emu.dtc_codes = [c.upper() for c in dtc_codes]
|
|
||||||
return {
|
|
||||||
"status": "ok",
|
|
||||||
"dtcs": emu.dtc_codes,
|
|
||||||
"count": len(emu.dtc_codes),
|
|
||||||
}
|
|
||||||
|
|
||||||
@mcp.tool()
|
|
||||||
async def bt_elm327_emu_set_vin(vin: str) -> dict[str, Any]:
|
|
||||||
"""Set the Vehicle Identification Number for Mode 09 queries.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
vin: 17-character VIN string.
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
Status with configured VIN.
|
|
||||||
"""
|
|
||||||
emu = _get_emulator()
|
|
||||||
if len(vin) != 17:
|
|
||||||
return {"status": "error", "error": f"VIN must be 17 characters (got {len(vin)})"}
|
|
||||||
emu.vin = vin.upper()
|
|
||||||
return {"status": "ok", "vin": emu.vin}
|
|
||||||
|
|
||||||
@mcp.tool()
|
|
||||||
async def bt_elm327_emu_read_commands(
|
|
||||||
since_index: int = 0,
|
|
||||||
limit: int = 50,
|
|
||||||
) -> dict[str, Any]:
|
|
||||||
"""Read commands received from the OBD-II client.
|
|
||||||
|
|
||||||
Monitor what the connected app is requesting. Useful for debugging
|
|
||||||
protocol issues or understanding app behavior.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
since_index: Only return commands with index >= this value.
|
|
||||||
limit: Maximum commands to return.
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
List of received commands with timestamps and responses.
|
|
||||||
"""
|
|
||||||
emu = _get_emulator()
|
|
||||||
cmds = emu.get_commands(since_index, limit)
|
|
||||||
next_idx = max((c["index"] for c in cmds), default=since_index) + 1 if cmds else since_index
|
|
||||||
return {
|
|
||||||
"status": "ok",
|
|
||||||
"commands": cmds,
|
|
||||||
"count": len(cmds),
|
|
||||||
"next_index": next_idx,
|
|
||||||
}
|
|
||||||
@ -1,392 +0,0 @@
|
|||||||
"""GATT Server tools for Bluetooth MCP server.
|
|
||||||
|
|
||||||
These tools let Linux act as a BLE peripheral (GATT server) that
|
|
||||||
advertises services, accepts connections, and handles reads/writes
|
|
||||||
from remote central devices. Use cases include: emulating sensors,
|
|
||||||
building test harnesses, mocking BLE devices, protocol fuzzing.
|
|
||||||
|
|
||||||
Typical flow:
|
|
||||||
1. bt_gatt_server_add_service(uuid) → service0
|
|
||||||
2. bt_gatt_server_add_characteristic(service0, ...) → service0/char0
|
|
||||||
3. bt_gatt_server_register(adapter="hci0") → registered
|
|
||||||
4. bt_gatt_server_advertise(adapter="hci0", enable=True, name="MyDevice")
|
|
||||||
5. Remote connects, writes → bt_gatt_server_read_writes()
|
|
||||||
6. Respond via bt_gatt_server_set_value() → auto-notifies
|
|
||||||
"""
|
|
||||||
|
|
||||||
from __future__ import annotations
|
|
||||||
|
|
||||||
from typing import Any
|
|
||||||
|
|
||||||
from fastmcp import FastMCP
|
|
||||||
|
|
||||||
from mcbluetooth.gatt_server import get_gatt_server, shutdown_gatt_server
|
|
||||||
|
|
||||||
|
|
||||||
def _parse_value(value: str, value_type: str) -> bytes:
|
|
||||||
"""Convert a string value to bytes based on value_type."""
|
|
||||||
if value_type == "hex":
|
|
||||||
return bytes.fromhex(value)
|
|
||||||
elif value_type == "string":
|
|
||||||
return value.encode("utf-8")
|
|
||||||
elif value_type == "int":
|
|
||||||
int_val = int(value)
|
|
||||||
byte_len = max(1, (int_val.bit_length() + 7) // 8)
|
|
||||||
return int_val.to_bytes(byte_len, "little")
|
|
||||||
else:
|
|
||||||
raise ValueError(f"Unknown value_type: {value_type}")
|
|
||||||
|
|
||||||
|
|
||||||
def register_tools(mcp: FastMCP) -> None:
|
|
||||||
"""Register GATT server tools with the MCP server."""
|
|
||||||
|
|
||||||
# ---- Lifecycle ----
|
|
||||||
|
|
||||||
@mcp.tool()
|
|
||||||
async def bt_gatt_server_register(adapter: str) -> dict[str, Any]:
|
|
||||||
"""Register the GATT application with BlueZ.
|
|
||||||
|
|
||||||
Exports all added services/characteristics to D-Bus and calls
|
|
||||||
GattManager1.RegisterApplication. Must add at least one service
|
|
||||||
before registering.
|
|
||||||
|
|
||||||
After registration, remote BLE centrals can discover and interact
|
|
||||||
with the hosted services (once advertising is started).
|
|
||||||
|
|
||||||
Args:
|
|
||||||
adapter: Adapter name (e.g., "hci0").
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
Registration status with service count.
|
|
||||||
"""
|
|
||||||
try:
|
|
||||||
mgr = get_gatt_server()
|
|
||||||
await mgr.register(adapter)
|
|
||||||
status = mgr.get_status()
|
|
||||||
return {
|
|
||||||
"status": "ok",
|
|
||||||
"registered": True,
|
|
||||||
"service_count": len(status["services"]),
|
|
||||||
}
|
|
||||||
except Exception as exc:
|
|
||||||
return {"status": "error", "error": str(exc)}
|
|
||||||
|
|
||||||
@mcp.tool()
|
|
||||||
async def bt_gatt_server_unregister(adapter: str) -> dict[str, Any]:
|
|
||||||
"""Unregister the GATT application from BlueZ.
|
|
||||||
|
|
||||||
Stops advertising (if active) and removes the application
|
|
||||||
registration. Services remain in memory and can be re-registered.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
adapter: Adapter name (e.g., "hci0").
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
Status confirming unregistration.
|
|
||||||
"""
|
|
||||||
try:
|
|
||||||
await shutdown_gatt_server(adapter)
|
|
||||||
return {"status": "ok", "registered": False}
|
|
||||||
except Exception as exc:
|
|
||||||
return {"status": "error", "error": str(exc)}
|
|
||||||
|
|
||||||
@mcp.tool()
|
|
||||||
async def bt_gatt_server_status() -> dict[str, Any]:
|
|
||||||
"""Get GATT server status.
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
- registered: Whether the application is registered with BlueZ
|
|
||||||
- advertising: Whether BLE advertising is active
|
|
||||||
- services: List of services with their characteristics
|
|
||||||
- write_event_count: Number of buffered write events
|
|
||||||
"""
|
|
||||||
mgr = get_gatt_server()
|
|
||||||
return {"status": "ok", **mgr.get_status()}
|
|
||||||
|
|
||||||
# ---- Service construction ----
|
|
||||||
|
|
||||||
@mcp.tool()
|
|
||||||
async def bt_gatt_server_add_service(
|
|
||||||
uuid: str,
|
|
||||||
primary: bool = True,
|
|
||||||
) -> dict[str, Any]:
|
|
||||||
"""Add a GATT service to the server.
|
|
||||||
|
|
||||||
Services must be added BEFORE registering with BlueZ.
|
|
||||||
If already registered, unregister first, add services, then re-register.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
uuid: Service UUID (e.g., "180F" for Battery Service, or full
|
|
||||||
128-bit "6E400001-B5A3-F393-E0A9-E50E24DCCA9E" for custom).
|
|
||||||
primary: Whether this is a primary service (default True).
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
The assigned service_id (e.g., "service0") for adding characteristics.
|
|
||||||
"""
|
|
||||||
try:
|
|
||||||
mgr = get_gatt_server()
|
|
||||||
service_id = mgr.add_service(uuid, primary)
|
|
||||||
return {"status": "ok", "service_id": service_id, "uuid": uuid}
|
|
||||||
except Exception as exc:
|
|
||||||
return {"status": "error", "error": str(exc)}
|
|
||||||
|
|
||||||
@mcp.tool()
|
|
||||||
async def bt_gatt_server_add_characteristic(
|
|
||||||
service_id: str,
|
|
||||||
uuid: str,
|
|
||||||
flags: list[str],
|
|
||||||
value: str | None = None,
|
|
||||||
value_type: str = "hex",
|
|
||||||
) -> dict[str, Any]:
|
|
||||||
"""Add a characteristic to a GATT service.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
service_id: Service to add to (e.g., "service0").
|
|
||||||
uuid: Characteristic UUID.
|
|
||||||
flags: List of flags/permissions. Common values:
|
|
||||||
"read", "write", "write-without-response", "notify", "indicate",
|
|
||||||
"encrypt-read", "encrypt-write".
|
|
||||||
value: Initial value (optional). Format depends on value_type.
|
|
||||||
value_type: How to interpret value — "hex", "string", or "int".
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
The assigned char_id (e.g., "service0/char0") for setting values.
|
|
||||||
"""
|
|
||||||
try:
|
|
||||||
mgr = get_gatt_server()
|
|
||||||
initial = _parse_value(value, value_type) if value else b""
|
|
||||||
char_id = mgr.add_characteristic(service_id, uuid, flags, initial)
|
|
||||||
return {
|
|
||||||
"status": "ok",
|
|
||||||
"char_id": char_id,
|
|
||||||
"uuid": uuid,
|
|
||||||
"flags": flags,
|
|
||||||
}
|
|
||||||
except Exception as exc:
|
|
||||||
return {"status": "error", "error": str(exc)}
|
|
||||||
|
|
||||||
@mcp.tool()
|
|
||||||
async def bt_gatt_server_add_descriptor(
|
|
||||||
char_id: str,
|
|
||||||
uuid: str,
|
|
||||||
flags: list[str],
|
|
||||||
value: str | None = None,
|
|
||||||
value_type: str = "hex",
|
|
||||||
) -> dict[str, Any]:
|
|
||||||
"""Add a descriptor to a GATT characteristic.
|
|
||||||
|
|
||||||
Note: CCCD (0x2902) for notify/indicate is auto-managed by BlueZ —
|
|
||||||
don't add it manually.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
char_id: Characteristic to add to (e.g., "service0/char0").
|
|
||||||
uuid: Descriptor UUID (e.g., "2901" for Characteristic User Description).
|
|
||||||
flags: List of flags (e.g., ["read"], ["read", "write"]).
|
|
||||||
value: Initial value (optional).
|
|
||||||
value_type: How to interpret value — "hex", "string", or "int".
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
The assigned desc_id.
|
|
||||||
"""
|
|
||||||
try:
|
|
||||||
mgr = get_gatt_server()
|
|
||||||
initial = _parse_value(value, value_type) if value else b""
|
|
||||||
desc_id = mgr.add_descriptor(char_id, uuid, flags, initial)
|
|
||||||
return {"status": "ok", "desc_id": desc_id, "uuid": uuid}
|
|
||||||
except Exception as exc:
|
|
||||||
return {"status": "error", "error": str(exc)}
|
|
||||||
|
|
||||||
@mcp.tool()
|
|
||||||
async def bt_gatt_server_clear() -> dict[str, Any]:
|
|
||||||
"""Remove all services, characteristics, and descriptors.
|
|
||||||
|
|
||||||
Must unregister first. Use this to rebuild the service hierarchy
|
|
||||||
from scratch.
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
Status confirming services were cleared.
|
|
||||||
"""
|
|
||||||
try:
|
|
||||||
mgr = get_gatt_server()
|
|
||||||
mgr.clear()
|
|
||||||
return {"status": "ok", "cleared": True}
|
|
||||||
except Exception as exc:
|
|
||||||
return {"status": "error", "error": str(exc)}
|
|
||||||
|
|
||||||
# ---- Value management ----
|
|
||||||
|
|
||||||
@mcp.tool()
|
|
||||||
async def bt_gatt_server_set_value(
|
|
||||||
char_id: str,
|
|
||||||
value: str,
|
|
||||||
value_type: str = "hex",
|
|
||||||
) -> dict[str, Any]:
|
|
||||||
"""Set a characteristic value on the GATT server.
|
|
||||||
|
|
||||||
If a remote client has subscribed to notifications on this
|
|
||||||
characteristic, the new value is automatically pushed via
|
|
||||||
BLE notification.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
char_id: Characteristic ID (e.g., "service0/char0").
|
|
||||||
value: Value to set.
|
|
||||||
value_type: How to interpret value — "hex" (e.g., "0102ff"),
|
|
||||||
"string" (UTF-8 text), or "int" (decimal number).
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
Status with value details and whether notification was sent.
|
|
||||||
"""
|
|
||||||
try:
|
|
||||||
mgr = get_gatt_server()
|
|
||||||
data = _parse_value(value, value_type)
|
|
||||||
char = mgr._characteristics.get(char_id)
|
|
||||||
was_notifying = char.notifying if char else False
|
|
||||||
mgr.set_value(char_id, data)
|
|
||||||
return {
|
|
||||||
"status": "ok",
|
|
||||||
"char_id": char_id,
|
|
||||||
"value_hex": data.hex(),
|
|
||||||
"length": len(data),
|
|
||||||
"notified": was_notifying,
|
|
||||||
}
|
|
||||||
except Exception as exc:
|
|
||||||
return {"status": "error", "error": str(exc)}
|
|
||||||
|
|
||||||
@mcp.tool()
|
|
||||||
async def bt_gatt_server_get_value(char_id: str) -> dict[str, Any]:
|
|
||||||
"""Read a characteristic value from the server side.
|
|
||||||
|
|
||||||
This reads the value stored on OUR server, not from a remote device.
|
|
||||||
Use bt_ble_read() for reading from remote GATT servers.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
char_id: Characteristic ID (e.g., "service0/char0").
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
The current value as hex, bytes, and string (if decodable).
|
|
||||||
"""
|
|
||||||
try:
|
|
||||||
mgr = get_gatt_server()
|
|
||||||
value = mgr.get_value(char_id)
|
|
||||||
result: dict[str, Any] = {
|
|
||||||
"status": "ok",
|
|
||||||
"char_id": char_id,
|
|
||||||
"value_hex": value.hex(),
|
|
||||||
"value_bytes": list(value),
|
|
||||||
"length": len(value),
|
|
||||||
}
|
|
||||||
try:
|
|
||||||
result["value_string"] = value.decode("utf-8")
|
|
||||||
except UnicodeDecodeError:
|
|
||||||
pass
|
|
||||||
return result
|
|
||||||
except Exception as exc:
|
|
||||||
return {"status": "error", "error": str(exc)}
|
|
||||||
|
|
||||||
@mcp.tool()
|
|
||||||
async def bt_gatt_server_notify(char_id: str) -> dict[str, Any]:
|
|
||||||
"""Explicitly send a BLE notification for the current value.
|
|
||||||
|
|
||||||
Normally, bt_gatt_server_set_value auto-notifies. Use this to
|
|
||||||
re-send the current value without changing it.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
char_id: Characteristic ID (e.g., "service0/char0").
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
Status indicating whether notification was sent.
|
|
||||||
"""
|
|
||||||
try:
|
|
||||||
mgr = get_gatt_server()
|
|
||||||
sent = mgr.notify(char_id)
|
|
||||||
if sent:
|
|
||||||
return {"status": "ok", "notified": True}
|
|
||||||
return {
|
|
||||||
"status": "ok",
|
|
||||||
"notified": False,
|
|
||||||
"reason": "No client subscribed to notifications",
|
|
||||||
}
|
|
||||||
except Exception as exc:
|
|
||||||
return {"status": "error", "error": str(exc)}
|
|
||||||
|
|
||||||
# ---- Advertising ----
|
|
||||||
|
|
||||||
@mcp.tool()
|
|
||||||
async def bt_gatt_server_advertise(
|
|
||||||
adapter: str,
|
|
||||||
enable: bool,
|
|
||||||
name: str = "mcbluetooth",
|
|
||||||
service_uuids: list[str] | None = None,
|
|
||||||
) -> dict[str, Any]:
|
|
||||||
"""Start or stop BLE advertising.
|
|
||||||
|
|
||||||
When advertising, the device becomes discoverable to BLE scanners
|
|
||||||
and can accept connections from central devices.
|
|
||||||
|
|
||||||
Must register the GATT application first.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
adapter: Adapter name (e.g., "hci0").
|
|
||||||
enable: True to start advertising, False to stop.
|
|
||||||
name: Local name shown to scanners (default "mcbluetooth").
|
|
||||||
service_uuids: UUIDs to include in advertisement. If not specified,
|
|
||||||
uses all registered service UUIDs.
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
Advertising status.
|
|
||||||
"""
|
|
||||||
try:
|
|
||||||
mgr = get_gatt_server()
|
|
||||||
await mgr.set_advertising(adapter, enable, name=name, service_uuids=service_uuids)
|
|
||||||
return {
|
|
||||||
"status": "ok",
|
|
||||||
"advertising": enable,
|
|
||||||
"name": name if enable else None,
|
|
||||||
}
|
|
||||||
except Exception as exc:
|
|
||||||
return {"status": "error", "error": str(exc)}
|
|
||||||
|
|
||||||
# ---- Write event monitoring ----
|
|
||||||
|
|
||||||
@mcp.tool()
|
|
||||||
async def bt_gatt_server_read_writes(
|
|
||||||
since_index: int = 0,
|
|
||||||
char_id: str | None = None,
|
|
||||||
limit: int = 50,
|
|
||||||
) -> dict[str, Any]:
|
|
||||||
"""Read write events from remote BLE clients.
|
|
||||||
|
|
||||||
When a remote central device writes to a server characteristic,
|
|
||||||
the write is buffered here. Poll this to see incoming commands/data.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
since_index: Only return events with index >= this value.
|
|
||||||
Start with 0, then use the highest returned index + 1.
|
|
||||||
char_id: Filter to writes on this characteristic (optional).
|
|
||||||
limit: Maximum events to return (default 50).
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
List of write events with value, timestamp, and source device.
|
|
||||||
"""
|
|
||||||
mgr = get_gatt_server()
|
|
||||||
events = mgr.get_write_events(since_index, char_id, limit)
|
|
||||||
next_index = (
|
|
||||||
max((e["index"] for e in events), default=since_index) + 1 if events else since_index
|
|
||||||
)
|
|
||||||
return {
|
|
||||||
"status": "ok",
|
|
||||||
"events": events,
|
|
||||||
"count": len(events),
|
|
||||||
"next_index": next_index,
|
|
||||||
}
|
|
||||||
|
|
||||||
@mcp.tool()
|
|
||||||
async def bt_gatt_server_clear_writes() -> dict[str, Any]:
|
|
||||||
"""Clear the write event buffer.
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
Number of events cleared.
|
|
||||||
"""
|
|
||||||
mgr = get_gatt_server()
|
|
||||||
count = mgr.clear_write_events()
|
|
||||||
return {"status": "ok", "cleared_count": count}
|
|
||||||
@ -1,243 +0,0 @@
|
|||||||
"""SPP (Serial Port Profile) tools for Bluetooth MCP server.
|
|
||||||
|
|
||||||
Classic Bluetooth serial communication over RFCOMM. SPP provides raw
|
|
||||||
bidirectional byte streams — the classic BT equivalent of BLE's Nordic
|
|
||||||
UART Service.
|
|
||||||
|
|
||||||
Typical flow:
|
|
||||||
Server mode:
|
|
||||||
1. bt_spp_enable() → Register SPP profile
|
|
||||||
2. Make adapter discoverable → Remote device connects
|
|
||||||
3. bt_spp_send(address, "Hello\r\n") → Send data
|
|
||||||
4. bt_spp_recv() → Poll received data
|
|
||||||
Client mode:
|
|
||||||
1. bt_spp_enable() → Register SPP profile
|
|
||||||
2. bt_spp_connect(adapter, address) → Connect to remote SPP
|
|
||||||
3. bt_spp_send / bt_spp_recv → Bidirectional I/O
|
|
||||||
"""
|
|
||||||
|
|
||||||
from __future__ import annotations
|
|
||||||
|
|
||||||
from typing import Any, Literal
|
|
||||||
|
|
||||||
from fastmcp import FastMCP
|
|
||||||
|
|
||||||
from mcbluetooth.dbus_client import get_client
|
|
||||||
from mcbluetooth.spp import (
|
|
||||||
SPP_UUID,
|
|
||||||
disable_spp,
|
|
||||||
enable_spp,
|
|
||||||
get_spp,
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
def register_tools(mcp: FastMCP) -> None:
|
|
||||||
"""Register SPP tools with the MCP server."""
|
|
||||||
|
|
||||||
@mcp.tool()
|
|
||||||
async def bt_spp_enable(
|
|
||||||
uuid: str = SPP_UUID,
|
|
||||||
channel: int = 0,
|
|
||||||
name: str = "mcbluetooth SPP",
|
|
||||||
) -> dict[str, Any]:
|
|
||||||
"""Enable SPP (Serial Port Profile) for classic Bluetooth serial.
|
|
||||||
|
|
||||||
Registers an RFCOMM serial profile with BlueZ. After enabling,
|
|
||||||
remote devices can connect for bidirectional serial communication,
|
|
||||||
or use bt_spp_connect() to initiate outbound connections.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
uuid: Service UUID. Default is standard SPP (0x1101).
|
|
||||||
Use custom UUIDs for Arduino/ESP32 with non-standard RFCOMM.
|
|
||||||
channel: RFCOMM channel (0 = auto-assign, recommended).
|
|
||||||
name: Profile display name visible during service discovery.
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
Registration status with active UUID.
|
|
||||||
"""
|
|
||||||
try:
|
|
||||||
await enable_spp(uuid=uuid, channel=channel, name=name)
|
|
||||||
return {"status": "ok", "uuid": uuid, "channel": channel or "auto", "name": name}
|
|
||||||
except Exception as exc:
|
|
||||||
return {"status": "error", "error": str(exc)}
|
|
||||||
|
|
||||||
@mcp.tool()
|
|
||||||
async def bt_spp_disable() -> dict[str, Any]:
|
|
||||||
"""Disable SPP and close all serial connections.
|
|
||||||
|
|
||||||
Unregisters the SPP profile from BlueZ and terminates any active
|
|
||||||
RFCOMM sessions.
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
Status confirming profile removal.
|
|
||||||
"""
|
|
||||||
try:
|
|
||||||
await disable_spp()
|
|
||||||
return {"status": "ok", "disabled": True}
|
|
||||||
except Exception as exc:
|
|
||||||
return {"status": "error", "error": str(exc)}
|
|
||||||
|
|
||||||
@mcp.tool()
|
|
||||||
async def bt_spp_status() -> dict[str, Any]:
|
|
||||||
"""Get SPP status: registration state, connections, and buffer stats.
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
- registered: Whether the SPP profile is active
|
|
||||||
- uuid: The UUID the profile was registered with
|
|
||||||
- connections: Active peers with role, duration, byte counters
|
|
||||||
- recv_buffer_count: Buffered received data events
|
|
||||||
- recv_buffer_total: Total events received since enable
|
|
||||||
"""
|
|
||||||
profile = await get_spp()
|
|
||||||
if not profile:
|
|
||||||
return {"status": "ok", "registered": False, "connections": []}
|
|
||||||
return {"status": "ok", **profile.get_status()}
|
|
||||||
|
|
||||||
@mcp.tool()
|
|
||||||
async def bt_spp_connect(
|
|
||||||
adapter: str,
|
|
||||||
address: str,
|
|
||||||
uuid: str = SPP_UUID,
|
|
||||||
) -> dict[str, Any]:
|
|
||||||
"""Connect to a remote device's SPP service (client mode).
|
|
||||||
|
|
||||||
Initiates an outbound RFCOMM connection. The SPP profile must be
|
|
||||||
enabled first (bt_spp_enable). BlueZ will deliver the RFCOMM fd
|
|
||||||
through our Profile1 handler automatically.
|
|
||||||
|
|
||||||
The device should already be paired and trusted.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
adapter: Bluetooth adapter (e.g. "hci0").
|
|
||||||
address: Remote device Bluetooth address.
|
|
||||||
uuid: SPP service UUID on the remote device.
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
Connection status.
|
|
||||||
"""
|
|
||||||
profile = await get_spp()
|
|
||||||
if not profile:
|
|
||||||
return {"status": "error", "error": "SPP not enabled — call bt_spp_enable() first"}
|
|
||||||
|
|
||||||
try:
|
|
||||||
client = await get_client()
|
|
||||||
await client.connect_profile(adapter, address, uuid)
|
|
||||||
# BlueZ fires NewConnection on our Profile1 handler — tag as client
|
|
||||||
device_path = f"/org/bluez/{adapter}/dev_{address.upper().replace(':', '_')}"
|
|
||||||
profile.add_client_connection(device_path, address, -1)
|
|
||||||
return {"status": "ok", "address": address, "role": "client"}
|
|
||||||
except Exception as exc:
|
|
||||||
return {"status": "error", "error": str(exc)}
|
|
||||||
|
|
||||||
@mcp.tool()
|
|
||||||
async def bt_spp_disconnect(address: str) -> dict[str, Any]:
|
|
||||||
"""Disconnect a specific SPP peer.
|
|
||||||
|
|
||||||
Closes the RFCOMM socket and removes the connection.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
address: Bluetooth address of the peer to disconnect.
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
Disconnect status.
|
|
||||||
"""
|
|
||||||
profile = await get_spp()
|
|
||||||
if not profile:
|
|
||||||
return {"status": "error", "error": "SPP not enabled"}
|
|
||||||
|
|
||||||
conn = profile._get_connection(address)
|
|
||||||
if not conn:
|
|
||||||
return {"status": "error", "error": f"No SPP connection to {address}"}
|
|
||||||
|
|
||||||
profile.connections.pop(conn.device_path, None)
|
|
||||||
profile._cleanup_connection(conn)
|
|
||||||
return {"status": "ok", "address": address, "disconnected": True}
|
|
||||||
|
|
||||||
@mcp.tool()
|
|
||||||
async def bt_spp_send(
|
|
||||||
address: str,
|
|
||||||
data: str,
|
|
||||||
data_type: Literal["string", "hex", "line"] = "string",
|
|
||||||
) -> dict[str, Any]:
|
|
||||||
"""Send data to a connected SPP peer.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
address: Bluetooth address of the peer.
|
|
||||||
data: The data to send.
|
|
||||||
data_type: How to interpret the data parameter:
|
|
||||||
- "string": Send as-is (UTF-8 encoded)
|
|
||||||
- "hex": Parse as hex string (e.g. "48656c6c6f")
|
|
||||||
- "line": Append CR+LF for line-oriented protocols
|
|
||||||
(AT commands, NMEA, etc.)
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
Send status with byte count.
|
|
||||||
"""
|
|
||||||
profile = await get_spp()
|
|
||||||
if not profile:
|
|
||||||
return {"status": "error", "error": "SPP not enabled"}
|
|
||||||
|
|
||||||
if data_type == "hex":
|
|
||||||
try:
|
|
||||||
raw = bytes.fromhex(data)
|
|
||||||
except ValueError as exc:
|
|
||||||
return {"status": "error", "error": f"Invalid hex: {exc}"}
|
|
||||||
elif data_type == "line":
|
|
||||||
raw = (data + "\r\n").encode("utf-8")
|
|
||||||
else:
|
|
||||||
raw = data.encode("utf-8")
|
|
||||||
|
|
||||||
ok = await profile.send(address, raw)
|
|
||||||
if ok:
|
|
||||||
return {"status": "ok", "bytes_sent": len(raw)}
|
|
||||||
return {"status": "error", "error": f"No active SPP connection to {address}"}
|
|
||||||
|
|
||||||
@mcp.tool()
|
|
||||||
async def bt_spp_recv(
|
|
||||||
since_index: int = 0,
|
|
||||||
address: str | None = None,
|
|
||||||
limit: int = 50,
|
|
||||||
) -> dict[str, Any]:
|
|
||||||
"""Read received data from SPP connections (cursor-based polling).
|
|
||||||
|
|
||||||
Returns buffered data events since the given index. Use the highest
|
|
||||||
returned index + 1 as since_index for the next poll to avoid
|
|
||||||
duplicates.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
since_index: Return events with index >= this value.
|
|
||||||
Start at 0 for first call, then use last index + 1.
|
|
||||||
address: Filter by peer address (optional).
|
|
||||||
limit: Maximum number of events to return.
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
List of data events with index, timestamp, address, and values.
|
|
||||||
"""
|
|
||||||
profile = await get_spp()
|
|
||||||
if not profile:
|
|
||||||
return {"status": "ok", "events": [], "hint": "SPP not enabled"}
|
|
||||||
|
|
||||||
events = profile.get_recv_events(since_index=since_index, address=address, limit=limit)
|
|
||||||
return {
|
|
||||||
"status": "ok",
|
|
||||||
"count": len(events),
|
|
||||||
"events": events,
|
|
||||||
"next_index": events[-1]["index"] + 1 if events else since_index,
|
|
||||||
}
|
|
||||||
|
|
||||||
@mcp.tool()
|
|
||||||
async def bt_spp_clear_recv() -> dict[str, Any]:
|
|
||||||
"""Clear the SPP receive buffer.
|
|
||||||
|
|
||||||
Removes all buffered received data events. The index counter
|
|
||||||
continues from where it left off (not reset to 0).
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
Count of events cleared.
|
|
||||||
"""
|
|
||||||
profile = await get_spp()
|
|
||||||
if not profile:
|
|
||||||
return {"status": "ok", "cleared": 0}
|
|
||||||
|
|
||||||
count = profile.clear_recv_events()
|
|
||||||
return {"status": "ok", "cleared": count}
|
|
||||||
Loading…
x
Reference in New Issue
Block a user