Add BLE notification capture via MCP resources

Implements async GATT notification buffering with D-Bus signal subscription.
Notifications are captured in circular buffers (100 values) and exposed via
dynamic MCP resources for polling or subscription.

New resources:
- bluetooth://ble/notifications (list active subscriptions)
- bluetooth://ble/{addr}/{uuid}/notifications (latest value + stats)
- bluetooth://ble/{addr}/{uuid}/notifications/history (buffered history)

New tools: bt_ble_notification_status, bt_ble_clear_notification_buffer
Updated bt_ble_notify to return resource URIs for easy access.
This commit is contained in:
Ryan Malloy 2026-02-09 11:38:38 -07:00
parent 3df5a25f18
commit 766e8abfb4
5 changed files with 629 additions and 32 deletions

View File

@ -0,0 +1,144 @@
"""MCP subscription manager for BLE notifications.
This module bridges BlueZ D-Bus notification signals to MCP resource subscriptions.
When a BLE device sends a GATT notification, subscribed MCP clients receive a
`notifications/resources/updated` message for the corresponding resource URI.
"""
from __future__ import annotations
import asyncio
import logging
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from fastmcp import FastMCP
from mcbluetooth.dbus_client import BLENotifyManager
logger = logging.getLogger(__name__)
def make_notification_uri(address: str, char_uuid: str) -> str:
"""Construct the MCP resource URI for a notification subscription.
Args:
address: Device Bluetooth address (e.g., "AA:BB:CC:DD:EE:FF")
char_uuid: Characteristic UUID (full form)
Returns:
Resource URI like "bluetooth://ble/AA:BB:CC:DD:EE:FF/00002a37-.../notifications"
"""
# Normalize address (uppercase, colons)
addr_normalized = address.upper()
# Keep UUID lowercase for consistency
uuid_normalized = char_uuid.lower()
return f"bluetooth://ble/{addr_normalized}/{uuid_normalized}/notifications"
def parse_notification_uri(uri: str) -> tuple[str, str] | None:
"""Parse a notification resource URI to extract address and char_uuid.
Args:
uri: Resource URI
Returns:
Tuple of (address, char_uuid) or None if not a valid notification URI
"""
prefix = "bluetooth://ble/"
suffix = "/notifications"
if not uri.startswith(prefix) or not uri.endswith(suffix):
return None
# Extract the middle part: "AA:BB:CC:DD:EE:FF/uuid"
middle = uri[len(prefix) : -len(suffix)]
parts = middle.split("/")
if len(parts) != 2:
return None
address, char_uuid = parts
return address, char_uuid
class BLESubscriptionManager:
"""Manages MCP subscriptions for BLE notification resources.
This class:
- Tracks which resource URIs have been subscribed to
- Sends resource_updated notifications when D-Bus signals arrive
- Handles automatic cleanup
"""
def __init__(self, mcp: FastMCP, notify_manager: BLENotifyManager):
self._mcp = mcp
self._notify_manager = notify_manager
self._subscribed_uris: set[str] = set()
self._event_loop: asyncio.AbstractEventLoop | None = None
# Register callback to receive notification events
notify_manager.on_notification(self._on_ble_notification)
def _on_ble_notification(self, address: str, char_uuid: str, value: bytes) -> None:
"""Callback when a BLE notification is received.
This is called synchronously from the D-Bus signal handler.
We schedule the async MCP notification on the event loop.
"""
uri = make_notification_uri(address, char_uuid)
# Check if anyone is subscribed
if uri not in self._subscribed_uris:
return
# Schedule the resource update notification
if self._event_loop is None:
try:
self._event_loop = asyncio.get_running_loop()
except RuntimeError:
logger.warning("No event loop available for notification dispatch")
return
# Fire-and-forget the notification
self._event_loop.call_soon_threadsafe(
lambda: asyncio.create_task(self._send_resource_updated(uri))
)
async def _send_resource_updated(self, uri: str) -> None:
"""Send a resource_updated notification to MCP clients."""
try:
# Access the underlying MCP server to send notifications
# FastMCP exposes this through _mcp_server
server = getattr(self._mcp, "_mcp_server", None)
if server is None:
logger.debug("MCP server not available for resource notifications")
return
# The MCP protocol uses notifications/resources/updated
# We need to send this through the server's notification mechanism
await server.request_context.session.send_resource_updated(uri)
logger.debug(f"Sent resource_updated for {uri}")
except Exception as e:
# Don't crash on notification failures
logger.debug(f"Failed to send resource_updated: {e}")
def subscribe(self, uri: str) -> None:
"""Mark a URI as subscribed (for tracking purposes)."""
self._subscribed_uris.add(uri)
def unsubscribe(self, uri: str) -> None:
"""Mark a URI as unsubscribed."""
self._subscribed_uris.discard(uri)
def is_subscribed(self, uri: str) -> bool:
"""Check if a URI is currently subscribed."""
return uri in self._subscribed_uris
def list_subscriptions(self) -> list[str]:
"""List all subscribed URIs."""
return list(self._subscribed_uris)
def clear_subscriptions(self) -> None:
"""Clear all subscriptions."""
self._subscribed_uris.clear()

View File

@ -18,7 +18,10 @@ Object paths follow this pattern:
from __future__ import annotations
from collections import deque
from collections.abc import Callable
from dataclasses import dataclass, field
from datetime import UTC, datetime
from typing import Any
from dbus_fast import BusType, Variant
@ -134,6 +137,219 @@ class GattCharacteristic:
notifying: bool = False
@dataclass
class NotificationValue:
"""A single BLE notification value with timestamp."""
timestamp: datetime
value: bytes
def to_dict(self) -> dict[str, Any]:
"""Convert to JSON-serializable dict."""
return {
"timestamp": self.timestamp.isoformat(),
"value_hex": self.value.hex(),
"value_bytes": list(self.value),
}
@dataclass
class NotificationBuffer:
"""Circular buffer for BLE notification values."""
address: str
char_uuid: str
char_path: str
max_size: int = 100
values: deque[NotificationValue] = field(default_factory=lambda: deque(maxlen=100))
total_received: int = 0
notifying: bool = True
def __post_init__(self) -> None:
# Ensure deque has correct maxlen
if not isinstance(self.values, deque) or self.values.maxlen != self.max_size:
self.values = deque(maxlen=self.max_size)
def add(self, value: bytes) -> NotificationValue:
"""Add a notification value to the buffer."""
notification = NotificationValue(
timestamp=datetime.now(UTC),
value=value,
)
self.values.append(notification)
self.total_received += 1
return notification
@property
def latest(self) -> NotificationValue | None:
"""Get the most recent notification value."""
return self.values[-1] if self.values else None
def get_history(self, count: int = 10) -> list[NotificationValue]:
"""Get the most recent N notification values."""
return list(self.values)[-count:]
# Type alias for notification callbacks
NotificationCallback = Callable[[str, str, bytes], None] # (address, char_uuid, value)
class BLENotifyManager:
"""Manages BLE GATT notification subscriptions and buffers.
This class handles:
- D-Bus signal subscriptions for PropertiesChanged on characteristics
- Circular buffers for notification values per characteristic
- Callbacks for notifying higher layers of new values
"""
def __init__(self, client: BlueZClient):
self._client = client
self._buffers: dict[str, NotificationBuffer] = {} # char_path -> buffer
self._signal_handlers: dict[str, Any] = {} # char_path -> handler removal func
self._callbacks: list[NotificationCallback] = []
def on_notification(self, callback: NotificationCallback) -> None:
"""Register a callback for notification events.
The callback receives (address, char_uuid, value) when a notification arrives.
"""
self._callbacks.append(callback)
def remove_callback(self, callback: NotificationCallback) -> None:
"""Remove a previously registered callback."""
if callback in self._callbacks:
self._callbacks.remove(callback)
async def subscribe(self, char_path: str, address: str, char_uuid: str) -> NotificationBuffer:
"""Subscribe to D-Bus signals for a characteristic.
Args:
char_path: D-Bus object path of the characteristic
address: Device Bluetooth address
char_uuid: Characteristic UUID
Returns:
The notification buffer for this characteristic
"""
# Create or return existing buffer
if char_path in self._buffers:
self._buffers[char_path].notifying = True
return self._buffers[char_path]
buffer = NotificationBuffer(
address=address,
char_uuid=char_uuid,
char_path=char_path,
)
self._buffers[char_path] = buffer
# Subscribe to PropertiesChanged signal
await self._subscribe_to_signal(char_path)
return buffer
async def _subscribe_to_signal(self, char_path: str) -> None:
"""Subscribe to D-Bus PropertiesChanged signal for a characteristic."""
await self._client._ensure_connected()
assert self._client._bus is not None
try:
introspection = await self._client._bus.introspect(BLUEZ_SERVICE, char_path)
proxy = self._client._bus.get_proxy_object(BLUEZ_SERVICE, char_path, introspection)
props_iface = proxy.get_interface(DBUS_PROPS_IFACE)
def on_properties_changed(
interface: str,
changed: dict[str, Any],
invalidated: list[str],
) -> None:
"""Handle PropertiesChanged signal."""
if interface != BLUEZ_GATT_CHAR_IFACE:
return
if "Value" in changed:
value = changed["Value"]
# Unwrap Variant if needed
if isinstance(value, Variant):
value = value.value
# Convert to bytes
raw_bytes = bytes(value) if isinstance(value, (list, bytearray)) else value
buffer = self._buffers.get(char_path)
if buffer:
buffer.add(raw_bytes)
# Fire callbacks
for cb in self._callbacks:
try:
cb(buffer.address, buffer.char_uuid, raw_bytes)
except Exception:
pass # Don't let callback errors break signal handling
# Connect signal handler
props_iface.on_properties_changed(on_properties_changed)
self._signal_handlers[char_path] = (props_iface, on_properties_changed)
except Exception as e:
# Clean up buffer if signal subscription fails
if char_path in self._buffers:
del self._buffers[char_path]
raise RuntimeError(f"Failed to subscribe to notifications: {e}") from e
async def unsubscribe(self, char_path: str) -> None:
"""Unsubscribe from D-Bus signals for a characteristic.
Note: This doesn't stop notifications on the device - use BlueZClient.stop_notify()
for that. This just stops receiving signals in this manager.
"""
if char_path in self._signal_handlers:
props_iface, handler = self._signal_handlers.pop(char_path)
props_iface.off_properties_changed(handler)
if char_path in self._buffers:
self._buffers[char_path].notifying = False
def get_buffer(self, char_path: str) -> NotificationBuffer | None:
"""Get the notification buffer for a characteristic."""
return self._buffers.get(char_path)
def get_buffer_by_address_uuid(self, address: str, char_uuid: str) -> NotificationBuffer | None:
"""Find buffer by device address and characteristic UUID."""
address_upper = address.upper()
char_uuid_lower = char_uuid.lower()
for buffer in self._buffers.values():
if (
buffer.address.upper() == address_upper
and buffer.char_uuid.lower() == char_uuid_lower
):
return buffer
return None
def list_active_subscriptions(self) -> list[dict[str, Any]]:
"""List all active notification subscriptions."""
return [
{
"address": buf.address,
"char_uuid": buf.char_uuid,
"char_path": buf.char_path,
"notifying": buf.notifying,
"buffer_count": len(buf.values),
"total_received": buf.total_received,
}
for buf in self._buffers.values()
]
def clear_buffer(self, char_path: str) -> None:
"""Clear the notification buffer for a characteristic."""
if char_path in self._buffers:
self._buffers[char_path].values.clear()
def remove_buffer(self, char_path: str) -> None:
"""Remove a buffer entirely (for cleanup)."""
if char_path in self._buffers:
del self._buffers[char_path]
class BlueZClient:
"""Async client for BlueZ D-Bus API."""
@ -177,9 +393,7 @@ class BlueZClient:
for path, interfaces in objects.items():
result[path] = {}
for iface, props in interfaces.items():
result[path][iface] = {
k: unwrap_variant(v) for k, v in props.items()
}
result[path][iface] = {k: unwrap_variant(v) for k, v in props.items()}
return result
async def _get_interface(self, path: str, interface: str) -> ProxyInterface:
@ -268,9 +482,7 @@ class BlueZClient:
"""Set adapter discoverable state."""
path = f"/org/bluez/{adapter}"
if timeout > 0:
await self._set_property(
path, BLUEZ_ADAPTER_IFACE, "DiscoverableTimeout", timeout
)
await self._set_property(path, BLUEZ_ADAPTER_IFACE, "DiscoverableTimeout", timeout)
await self._set_property(path, BLUEZ_ADAPTER_IFACE, "Discoverable", discoverable)
async def set_adapter_pairable(self, adapter: str, pairable: bool, timeout: int = 0) -> None:
@ -522,7 +734,10 @@ class BlueZClient:
if service_uuid:
for path, interfaces in objects.items():
if BLUEZ_GATT_SERVICE_IFACE in interfaces:
if interfaces[BLUEZ_GATT_SERVICE_IFACE].get("UUID", "").lower() == service_uuid.lower():
if (
interfaces[BLUEZ_GATT_SERVICE_IFACE].get("UUID", "").lower()
== service_uuid.lower()
):
service_paths.add(path)
characteristics = []
@ -595,6 +810,7 @@ class BlueZClient:
# Global client instance
_client: BlueZClient | None = None
_notify_manager: BLENotifyManager | None = None
async def get_client() -> BlueZClient:
@ -604,3 +820,12 @@ async def get_client() -> BlueZClient:
_client = BlueZClient()
await _client.connect()
return _client
async def get_notify_manager() -> BLENotifyManager:
"""Get or create the global BLE notify manager."""
global _notify_manager
if _notify_manager is None:
client = await get_client()
_notify_manager = BLENotifyManager(client)
return _notify_manager

View File

@ -3,6 +3,7 @@
Resources provide a live, queryable view of Bluetooth state:
- Adapters: Available Bluetooth controllers
- Devices: Filtered by state (visible, paired, connected)
- BLE Notifications: Real-time GATT notification values
Unlike tools which perform actions, resources are read-only snapshots
that clients can poll or subscribe to for state changes.
@ -15,7 +16,7 @@ from dataclasses import asdict
from fastmcp import FastMCP
from mcbluetooth.dbus_client import get_client
from mcbluetooth.dbus_client import get_client, get_notify_manager
def register_resources(mcp: FastMCP) -> None:
@ -196,3 +197,113 @@ def register_resources(mcp: FastMCP) -> None:
# DeviceInfo already has manufacturer_data/service_data as hex strings
return json.dumps(asdict(d), indent=2)
return json.dumps({"error": f"Device '{address}' not found"})
# ==================== BLE Notification Resources ====================
def _format_uuid_short(uuid: str) -> str:
"""Format UUID for display - short form for standard UUIDs."""
if uuid.lower().endswith("-0000-1000-8000-00805f9b34fb"):
short = uuid[:8].lstrip("0") or "0"
return f"0x{short.upper()}"
return uuid
@mcp.resource(
"bluetooth://ble/{address}/{char_uuid}/notifications",
name="BLE Notifications",
description=(
"Current state of a BLE GATT characteristic notification subscription. "
"Returns the latest notification value and buffer statistics. "
"Subscribe to this resource to receive updates when new notifications arrive."
),
mime_type="application/json",
)
async def resource_ble_notifications(address: str, char_uuid: str) -> str:
"""Get current notification state for a characteristic."""
notify_manager = await get_notify_manager()
buffer = notify_manager.get_buffer_by_address_uuid(address, char_uuid)
if not buffer:
return json.dumps(
{
"error": f"No active notification subscription for {address}/{char_uuid}",
"hint": "Use bt_ble_notify to enable notifications first",
}
)
result = {
"address": buffer.address,
"characteristic_uuid": buffer.char_uuid,
"characteristic_uuid_short": _format_uuid_short(buffer.char_uuid),
"notifying": buffer.notifying,
"latest": buffer.latest.to_dict() if buffer.latest else None,
"buffer_count": len(buffer.values),
"total_received": buffer.total_received,
}
return json.dumps(result, indent=2)
@mcp.resource(
"bluetooth://ble/{address}/{char_uuid}/notifications/history",
name="BLE Notification History",
description=(
"Buffered history of BLE GATT notification values. "
"Returns the most recent notification values with timestamps. "
"Default returns last 10 values; use count parameter for more."
),
mime_type="application/json",
)
async def resource_ble_notification_history(
address: str, char_uuid: str, count: int = 10
) -> str:
"""Get notification history for a characteristic."""
notify_manager = await get_notify_manager()
buffer = notify_manager.get_buffer_by_address_uuid(address, char_uuid)
if not buffer:
return json.dumps(
{
"error": f"No active notification subscription for {address}/{char_uuid}",
"hint": "Use bt_ble_notify to enable notifications first",
}
)
# Ensure count is reasonable
count = min(max(1, count), buffer.max_size)
history = buffer.get_history(count)
result = {
"address": buffer.address,
"characteristic_uuid": buffer.char_uuid,
"count": len(history),
"total_available": len(buffer.values),
"total_received": buffer.total_received,
"values": [v.to_dict() for v in history],
}
return json.dumps(result, indent=2)
@mcp.resource(
"bluetooth://ble/notifications",
name="Active BLE Subscriptions",
description=(
"List all active BLE notification subscriptions with their buffer statistics. "
"Useful for monitoring which characteristics are being tracked."
),
mime_type="application/json",
)
async def resource_ble_all_notifications() -> str:
"""List all active notification subscriptions."""
notify_manager = await get_notify_manager()
subscriptions = notify_manager.list_active_subscriptions()
# Enhance with short UUID format
for sub in subscriptions:
sub["uuid_short"] = _format_uuid_short(sub["char_uuid"])
return json.dumps(
{
"count": len(subscriptions),
"subscriptions": subscriptions,
},
indent=2,
)

View File

@ -25,6 +25,17 @@ This server provides comprehensive control over the Linux Bluetooth stack:
- bluetooth://adapter/{name} - Specific adapter details
- bluetooth://device/{address} - Specific device details
### BLE Notification Resources
- bluetooth://ble/notifications - List all active notification subscriptions
- bluetooth://ble/{address}/{char_uuid}/notifications - Current notification state and latest value
- bluetooth://ble/{address}/{char_uuid}/notifications/history - Buffered notification history
To capture BLE notifications:
1. Connect to the device: bt_connect(adapter, address)
2. Enable notifications: bt_ble_notify(adapter, address, char_uuid, enable=True)
3. Read notifications: Use the bluetooth://ble/{address}/{uuid}/notifications resource
4. Subscribe to the resource for real-time updates (client-side)
## Tools
All tools require an explicit 'adapter' parameter (e.g., "hci0").
Use bt_list_adapters() to discover available adapters.

View File

@ -7,7 +7,8 @@ from typing import Any
from fastmcp import FastMCP
from mcbluetooth.dbus_client import get_client
from mcbluetooth.ble_subscriptions import make_notification_uri
from mcbluetooth.dbus_client import get_client, get_notify_manager
# Common BLE service UUIDs
BATTERY_SERVICE_UUID = "0000180f-0000-1000-8000-00805f9b34fb"
@ -57,9 +58,7 @@ def register_tools(mcp: FastMCP) -> None:
# Set BLE-specific filter
uuids = [service_filter] if service_filter else None
await client.set_discovery_filter(
adapter, uuids=uuids, transport="le", duplicate_data=True
)
await client.set_discovery_filter(adapter, uuids=uuids, transport="le", duplicate_data=True)
await client.start_discovery(adapter)
try:
@ -84,11 +83,11 @@ def register_tools(mcp: FastMCP) -> None:
for k, v in d.manufacturer_data.items()
}
svc_data_hex = {
k: v.hex() if isinstance(v, bytes) else str(v)
for k, v in d.service_data.items()
k: v.hex() if isinstance(v, bytes) else str(v) for k, v in d.service_data.items()
}
result.append({
result.append(
{
"address": d.address,
"name": d.name or "(unknown)",
"rssi": d.rssi,
@ -98,7 +97,8 @@ def register_tools(mcp: FastMCP) -> None:
"manufacturer_data": mfr_data_hex,
"service_data": svc_data_hex,
"appearance": d.appearance,
})
}
)
return result
@ -318,8 +318,9 @@ def register_tools(mcp: FastMCP) -> None:
When enabled, the device will send updates when the characteristic
value changes (e.g., heart rate measurements, sensor data).
Note: To receive actual notifications, you would need to set up
a callback - this tool just enables/disables the notification mode.
Notification values are buffered (up to 100) and accessible via:
- Resource: bluetooth://ble/{address}/{char_uuid}/notifications
- Resource: bluetooth://ble/{address}/{char_uuid}/notifications/history
Args:
adapter: Adapter name (e.g., "hci0")
@ -328,7 +329,7 @@ def register_tools(mcp: FastMCP) -> None:
enable: True to enable notifications, False to disable
Returns:
Notification status
Notification status with resource_uri for subscription
"""
client = await get_client()
@ -347,12 +348,33 @@ def register_tools(mcp: FastMCP) -> None:
try:
if enable:
# Start BlueZ notifications
await client.start_notify(target_char.path)
else:
await client.stop_notify(target_char.path)
# Subscribe to D-Bus signals for buffering
notify_manager = await get_notify_manager()
await notify_manager.subscribe(target_char.path, address, target_char.uuid)
# Build resource URI for client subscription
resource_uri = make_notification_uri(address, target_char.uuid)
return {
"status": "notifications_enabled" if enable else "notifications_disabled",
"status": "notifications_enabled",
"uuid": char_uuid,
"uuid_short": _format_uuid(char_uuid),
"resource_uri": resource_uri,
"history_uri": f"{resource_uri}/history",
}
else:
# Stop BlueZ notifications
await client.stop_notify(target_char.path)
# Unsubscribe from D-Bus signals
notify_manager = await get_notify_manager()
await notify_manager.unsubscribe(target_char.path)
return {
"status": "notifications_disabled",
"uuid": char_uuid,
}
except Exception as e:
@ -401,3 +423,87 @@ def register_tools(mcp: FastMCP) -> None:
return {"error": "Invalid battery value"}
except Exception as e:
return {"error": str(e)}
@mcp.tool()
async def bt_ble_notification_status() -> dict[str, Any]:
"""List all active BLE notification subscriptions.
Shows which characteristics currently have notifications enabled
and are being buffered. Each subscription shows buffer statistics
and the resource URIs for accessing notification data.
Returns:
List of active notification subscriptions with buffer stats
"""
notify_manager = await get_notify_manager()
subscriptions = notify_manager.list_active_subscriptions()
# Enhance with resource URIs and short UUIDs
result = []
for sub in subscriptions:
resource_uri = make_notification_uri(sub["address"], sub["char_uuid"])
result.append(
{
"address": sub["address"],
"char_uuid": sub["char_uuid"],
"uuid_short": _format_uuid(sub["char_uuid"]),
"notifying": sub["notifying"],
"buffer_count": sub["buffer_count"],
"total_received": sub["total_received"],
"resource_uri": resource_uri,
"history_uri": f"{resource_uri}/history",
}
)
return {
"count": len(result),
"subscriptions": result,
}
@mcp.tool()
async def bt_ble_clear_notification_buffer(
adapter: str,
address: str,
char_uuid: str,
) -> dict[str, Any]:
"""Clear the notification buffer for a characteristic.
Removes all buffered notification values while keeping
the subscription active. Useful for starting fresh or
freeing memory.
Args:
adapter: Adapter name (e.g., "hci0")
address: Device Bluetooth address
char_uuid: Characteristic UUID
Returns:
Status of the clear operation
"""
client = await get_client()
# Find the characteristic to get its path
chars = await client.list_gatt_characteristics(adapter, address)
target_char = None
for c in chars:
if c.uuid.lower() == char_uuid.lower():
target_char = c
break
if not target_char:
return {"error": f"Characteristic {char_uuid} not found"}
notify_manager = await get_notify_manager()
buffer = notify_manager.get_buffer(target_char.path)
if not buffer:
return {"error": "No active notification subscription for this characteristic"}
cleared_count = len(buffer.values)
notify_manager.clear_buffer(target_char.path)
return {
"status": "buffer_cleared",
"uuid": char_uuid,
"cleared_count": cleared_count,
}