Add MCP resources, logging, and progress reporting

Resources (dynamic state queries):
- bluetooth://adapters - All Bluetooth adapters
- bluetooth://devices/{paired,connected,visible,trusted}
- bluetooth://adapter/{name} - Adapter details
- bluetooth://device/{address} - Device details

Logging throughout device operations using ctx.info/debug/warning/error
Progress reporting during bt_scan (reports completion percentage)
This commit is contained in:
Ryan Malloy 2026-02-02 02:10:05 -07:00
parent 013cd0eb2f
commit e9e5b0b4e6
4 changed files with 380 additions and 20 deletions

View File

@ -65,6 +65,20 @@ sudo usermod -aG bluetooth $USER
# Or configure polkit for BlueZ D-Bus access
```
## MCP Resources
The server exposes dynamic resources for live state queries:
| Resource URI | Description |
|--------------|-------------|
| `bluetooth://adapters` | All Bluetooth adapters |
| `bluetooth://devices/paired` | Paired devices |
| `bluetooth://devices/connected` | Connected devices |
| `bluetooth://devices/visible` | All known devices |
| `bluetooth://devices/trusted` | Trusted devices |
| `bluetooth://adapter/{name}` | Specific adapter details |
| `bluetooth://device/{address}` | Specific device details |
## MCP Tools
### Adapter Tools

View File

@ -0,0 +1,209 @@
"""MCP Resources for Bluetooth state exposure.
Resources provide a live, queryable view of Bluetooth state:
- Adapters: Available Bluetooth controllers
- Devices: Filtered by state (visible, paired, connected)
Unlike tools which perform actions, resources are read-only snapshots
that clients can poll or subscribe to for state changes.
"""
from __future__ import annotations
import json
from dataclasses import asdict
from fastmcp import FastMCP
from mcbluetooth.dbus_client import get_client
def register_resources(mcp: FastMCP) -> None:
"""Register Bluetooth state resources with the MCP server."""
@mcp.resource(
"bluetooth://adapters",
name="Bluetooth Adapters",
description="List of all Bluetooth adapters on the system with their current state",
mime_type="application/json",
)
async def resource_adapters() -> str:
"""Get all Bluetooth adapters."""
client = await get_client()
adapters = await client.list_adapters()
return json.dumps(
{
"adapters": [
{
"name": a.name,
"address": a.address,
"alias": a.alias,
"powered": a.powered,
"discoverable": a.discoverable,
"discovering": a.discovering,
"pairable": a.pairable,
}
for a in adapters
]
},
indent=2,
)
@mcp.resource(
"bluetooth://devices/paired",
name="Paired Devices",
description="Bluetooth devices that have been paired with this system",
mime_type="application/json",
)
async def resource_devices_paired() -> str:
"""Get all paired Bluetooth devices."""
client = await get_client()
devices = await client.list_devices(filter_type="paired")
return json.dumps(
{
"filter": "paired",
"count": len(devices),
"devices": [
{
"address": d.address,
"name": d.name or d.alias or "(unnamed)",
"connected": d.connected,
"trusted": d.trusted,
"adapter": d.adapter,
"uuids": d.uuids[:5], # First 5 UUIDs
}
for d in devices
],
},
indent=2,
)
@mcp.resource(
"bluetooth://devices/connected",
name="Connected Devices",
description="Bluetooth devices currently connected to this system",
mime_type="application/json",
)
async def resource_devices_connected() -> str:
"""Get all connected Bluetooth devices."""
client = await get_client()
devices = await client.list_devices(filter_type="connected")
return json.dumps(
{
"filter": "connected",
"count": len(devices),
"devices": [
{
"address": d.address,
"name": d.name or d.alias or "(unnamed)",
"paired": d.paired,
"trusted": d.trusted,
"adapter": d.adapter,
"services_resolved": d.services_resolved,
}
for d in devices
],
},
indent=2,
)
@mcp.resource(
"bluetooth://devices/visible",
name="Visible Devices",
description="All Bluetooth devices visible to this system (discovered or known)",
mime_type="application/json",
)
async def resource_devices_visible() -> str:
"""Get all visible/known Bluetooth devices."""
client = await get_client()
devices = await client.list_devices(filter_type="all")
return json.dumps(
{
"filter": "all",
"count": len(devices),
"devices": [
{
"address": d.address,
"name": d.name or d.alias or "(unnamed)",
"rssi": d.rssi,
"paired": d.paired,
"connected": d.connected,
"adapter": d.adapter,
}
for d in devices
],
},
indent=2,
)
@mcp.resource(
"bluetooth://devices/trusted",
name="Trusted Devices",
description="Bluetooth devices marked as trusted (auto-connect enabled)",
mime_type="application/json",
)
async def resource_devices_trusted() -> str:
"""Get all trusted Bluetooth devices."""
client = await get_client()
devices = await client.list_devices(filter_type="trusted")
return json.dumps(
{
"filter": "trusted",
"count": len(devices),
"devices": [
{
"address": d.address,
"name": d.name or d.alias or "(unnamed)",
"paired": d.paired,
"connected": d.connected,
"adapter": d.adapter,
}
for d in devices
],
},
indent=2,
)
# Dynamic resource template for per-adapter info
@mcp.resource(
"bluetooth://adapter/{adapter_name}",
name="Adapter Details",
description="Detailed information about a specific Bluetooth adapter",
mime_type="application/json",
)
async def resource_adapter_info(adapter_name: str) -> str:
"""Get detailed info for a specific adapter."""
client = await get_client()
adapter = await client.get_adapter(adapter_name)
if not adapter:
return json.dumps({"error": f"Adapter '{adapter_name}' not found"})
return json.dumps(asdict(adapter), indent=2)
# Dynamic resource for per-device info
@mcp.resource(
"bluetooth://device/{address}",
name="Device Details",
description="Detailed information about a specific Bluetooth device by address",
mime_type="application/json",
)
async def resource_device_info(address: str) -> str:
"""Get detailed info for a specific device."""
client = await get_client()
# Search across all adapters
devices = await client.list_devices()
for d in devices:
if d.address.upper() == address.upper():
data = asdict(d)
# Convert bytes to hex strings for JSON serialization
if data.get("manufacturer_data"):
data["manufacturer_data"] = {
k: v.hex() if isinstance(v, bytes) else str(v)
for k, v in data["manufacturer_data"].items()
}
if data.get("service_data"):
data["service_data"] = {
k: v.hex() if isinstance(v, bytes) else str(v)
for k, v in data["service_data"].items()
}
return json.dumps(data, indent=2)
return json.dumps({"error": f"Device '{address}' not found"})

View File

@ -2,6 +2,7 @@
from fastmcp import FastMCP
from mcbluetooth import resources
from mcbluetooth.tools import adapter, audio, ble, device
mcp = FastMCP(
@ -14,6 +15,16 @@ This server provides comprehensive control over the Linux Bluetooth stack:
- Audio profiles (A2DP, HFP) with PipeWire/PulseAudio integration
- BLE/GATT services (read/write characteristics, notifications)
## Resources (live state queries)
- bluetooth://adapters - All Bluetooth adapters
- bluetooth://devices/paired - Paired devices
- bluetooth://devices/connected - Connected devices
- bluetooth://devices/visible - All known devices
- bluetooth://devices/trusted - Trusted devices
- bluetooth://adapter/{name} - Specific adapter details
- bluetooth://device/{address} - Specific device details
## Tools
All tools require an explicit 'adapter' parameter (e.g., "hci0").
Use bt_list_adapters() to discover available adapters.
@ -24,6 +35,9 @@ For pairing, use pairing_mode parameter:
""",
)
# Register resources (live state)
resources.register_resources(mcp)
# Register tool modules
adapter.register_tools(mcp)
device.register_tools(mcp)

View File

@ -1,15 +1,18 @@
"""Device discovery and management tools for BlueZ."""
from __future__ import annotations
import asyncio
from dataclasses import asdict
from typing import Any, Literal
from fastmcp import FastMCP
from fastmcp import Context, FastMCP
from mcbluetooth.dbus_client import get_client
# Type aliases for MCP tool parameters
ScanMode = Literal["classic", "ble", "both"]
DeviceFilter = Literal["all", "paired", "connected", "trusted"]
PairingMode = Literal["elicit", "interactive", "auto"]
def register_tools(mcp: FastMCP) -> None:
"""Register device management tools with the MCP server."""
@ -18,7 +21,8 @@ def register_tools(mcp: FastMCP) -> None:
async def bt_scan(
adapter: str,
timeout: int = 10,
mode: Literal["classic", "ble", "both"] = "both",
mode: ScanMode = "both",
ctx: Context | None = None,
) -> list[dict[str, Any]]:
"""Scan for nearby Bluetooth devices.
@ -35,29 +39,48 @@ def register_tools(mcp: FastMCP) -> None:
"""
client = await get_client()
# Log scan start
if ctx:
await ctx.info(f"Starting {mode} scan on {adapter} for {timeout}s")
# Set discovery filter based on mode
transport = {"classic": "bredr", "ble": "le", "both": "auto"}[mode]
await client.set_discovery_filter(adapter, transport=transport, duplicate_data=True)
# Start discovery
await client.start_discovery(adapter)
if ctx:
await ctx.debug(f"Discovery started on {adapter}")
try:
# Wait for scan duration
await asyncio.sleep(timeout)
# Report progress during scan with periodic device count updates
for elapsed in range(timeout):
await asyncio.sleep(1)
if ctx:
# Report progress
await ctx.report_progress(progress=elapsed + 1, total=timeout)
# Periodically log device count
if (elapsed + 1) % 3 == 0 or elapsed == 0:
devices = await client.list_devices(adapter=adapter)
await ctx.debug(f"Scan progress: {len(devices)} devices found")
finally:
# Always stop discovery
await client.stop_discovery(adapter)
await client.remove_discovery_filter(adapter)
if ctx:
await ctx.debug("Discovery stopped")
# Return discovered devices
devices = await client.list_devices(adapter=adapter)
if ctx:
await ctx.info(f"Scan complete: found {len(devices)} devices")
return [asdict(d) for d in devices]
@mcp.tool()
async def bt_list_devices(
adapter: str,
filter: Literal["all", "paired", "connected", "trusted"] = "all",
filter: DeviceFilter = "all",
ctx: Context | None = None,
) -> list[dict[str, Any]]:
"""List known Bluetooth devices.
@ -72,11 +95,19 @@ def register_tools(mcp: FastMCP) -> None:
List of devices matching the filter
"""
client = await get_client()
if ctx:
await ctx.debug(f"Listing {filter} devices on {adapter}")
devices = await client.list_devices(adapter=adapter, filter_type=filter)
if ctx:
await ctx.info(f"Found {len(devices)} {filter} devices")
return [asdict(d) for d in devices]
@mcp.tool()
async def bt_device_info(adapter: str, address: str) -> dict[str, Any] | None:
async def bt_device_info(
adapter: str,
address: str,
ctx: Context | None = None,
) -> dict[str, Any] | None:
"""Get detailed information about a specific device.
Args:
@ -87,14 +118,19 @@ def register_tools(mcp: FastMCP) -> None:
Device properties or None if not found
"""
client = await get_client()
if ctx:
await ctx.debug(f"Getting info for {address}")
info = await client.get_device(adapter, address)
if info and ctx:
await ctx.info(f"Device: {info.name or info.alias or 'unnamed'} ({address})")
return asdict(info) if info else None
@mcp.tool()
async def bt_pair(
adapter: str,
address: str,
pairing_mode: Literal["elicit", "interactive", "auto"] = "interactive",
pairing_mode: PairingMode = "interactive",
ctx: Context | None = None,
) -> dict[str, Any]:
"""Initiate pairing with a device.
@ -114,9 +150,14 @@ def register_tools(mcp: FastMCP) -> None:
"""
client = await get_client()
if ctx:
await ctx.info(f"Initiating pairing with {address} (mode: {pairing_mode})")
# Check if already paired
device = await client.get_device(adapter, address)
if device and device.paired:
if ctx:
await ctx.info(f"Device {address} is already paired")
return {"status": "already_paired", "device": asdict(device)}
if pairing_mode == "auto":
@ -124,26 +165,33 @@ def register_tools(mcp: FastMCP) -> None:
try:
await client.pair_device(adapter, address)
device = await client.get_device(adapter, address)
if ctx:
await ctx.info(f"Successfully paired with {address}")
return {"status": "paired", "device": asdict(device) if device else None}
except Exception as e:
if ctx:
await ctx.error(f"Pairing failed: {e}")
return {"status": "error", "error": str(e)}
else:
# For interactive/elicit modes, we need an agent
# Return status indicating pairing initiated
try:
# Start pairing (this is async and may block for confirmation)
# In a real implementation, this would register an agent
await client.pair_device(adapter, address)
device = await client.get_device(adapter, address)
if ctx:
await ctx.info(f"Successfully paired with {address}")
return {"status": "paired", "device": asdict(device) if device else None}
except Exception as e:
error_msg = str(e)
if "AuthenticationFailed" in error_msg or "AuthenticationCanceled" in error_msg:
if ctx:
await ctx.warning("Pairing requires user confirmation or PIN")
return {
"status": "awaiting_confirmation",
"message": "Pairing requires user confirmation or PIN entry",
"pairing_mode": pairing_mode,
}
if ctx:
await ctx.error(f"Pairing failed: {error_msg}")
return {"status": "error", "error": error_msg}
@mcp.tool()
@ -152,6 +200,7 @@ def register_tools(mcp: FastMCP) -> None:
address: str,
pin: str | None = None,
accept: bool = True,
ctx: Context | None = None,
) -> dict[str, Any]:
"""Confirm or reject a pairing request.
@ -169,23 +218,36 @@ def register_tools(mcp: FastMCP) -> None:
client = await get_client()
if not accept:
if ctx:
await ctx.info(f"Rejecting pairing with {address}")
try:
await client.cancel_pairing(adapter, address)
return {"status": "pairing_cancelled"}
except Exception as e:
if ctx:
await ctx.error(f"Failed to cancel pairing: {e}")
return {"status": "error", "error": str(e)}
# Re-attempt pairing - agent would handle PIN
# For now, this is a simplified implementation
if ctx:
await ctx.info(f"Confirming pairing with {address}")
try:
await client.pair_device(adapter, address)
device = await client.get_device(adapter, address)
if ctx:
await ctx.info(f"Pairing confirmed with {address}")
return {"status": "paired", "device": asdict(device) if device else None}
except Exception as e:
if ctx:
await ctx.error(f"Pairing confirmation failed: {e}")
return {"status": "error", "error": str(e)}
@mcp.tool()
async def bt_unpair(adapter: str, address: str) -> dict[str, str]:
async def bt_unpair(
adapter: str,
address: str,
ctx: Context | None = None,
) -> dict[str, str]:
"""Remove pairing with a device.
This removes the device from the list of known devices and
@ -199,14 +261,24 @@ def register_tools(mcp: FastMCP) -> None:
Status of the operation
"""
client = await get_client()
if ctx:
await ctx.info(f"Removing device {address}")
try:
await client.remove_device(adapter, address)
if ctx:
await ctx.info(f"Device {address} removed successfully")
return {"status": "removed", "address": address}
except Exception as e:
if ctx:
await ctx.error(f"Failed to remove device: {e}")
return {"status": "error", "error": str(e)}
@mcp.tool()
async def bt_connect(adapter: str, address: str) -> dict[str, Any]:
async def bt_connect(
adapter: str,
address: str,
ctx: Context | None = None,
) -> dict[str, Any]:
"""Connect to a paired device.
Establishes an active connection to a previously paired device.
@ -220,15 +292,25 @@ def register_tools(mcp: FastMCP) -> None:
Connection status and device info
"""
client = await get_client()
if ctx:
await ctx.info(f"Connecting to {address}")
try:
await client.connect_device(adapter, address)
device = await client.get_device(adapter, address)
if ctx:
await ctx.info(f"Connected to {device.name or address}")
return {"status": "connected", "device": asdict(device) if device else None}
except Exception as e:
if ctx:
await ctx.error(f"Connection failed: {e}")
return {"status": "error", "error": str(e)}
@mcp.tool()
async def bt_disconnect(adapter: str, address: str) -> dict[str, Any]:
async def bt_disconnect(
adapter: str,
address: str,
ctx: Context | None = None,
) -> dict[str, Any]:
"""Disconnect from a device.
Terminates the active connection but preserves pairing.
@ -241,15 +323,26 @@ def register_tools(mcp: FastMCP) -> None:
Status of the operation
"""
client = await get_client()
if ctx:
await ctx.info(f"Disconnecting from {address}")
try:
await client.disconnect_device(adapter, address)
device = await client.get_device(adapter, address)
if ctx:
await ctx.info(f"Disconnected from {address}")
return {"status": "disconnected", "device": asdict(device) if device else None}
except Exception as e:
if ctx:
await ctx.error(f"Disconnect failed: {e}")
return {"status": "error", "error": str(e)}
@mcp.tool()
async def bt_trust(adapter: str, address: str, trusted: bool) -> dict[str, Any]:
async def bt_trust(
adapter: str,
address: str,
trusted: bool,
ctx: Context | None = None,
) -> dict[str, Any]:
"""Set device trust status.
Trusted devices can connect automatically without explicit
@ -264,15 +357,27 @@ def register_tools(mcp: FastMCP) -> None:
Updated device info
"""
client = await get_client()
action = "Trusting" if trusted else "Untrusting"
if ctx:
await ctx.info(f"{action} device {address}")
try:
await client.set_device_trusted(adapter, address, trusted)
device = await client.get_device(adapter, address)
if ctx:
await ctx.info(f"Device {address} {'trusted' if trusted else 'untrusted'}")
return {"status": "updated", "device": asdict(device) if device else None}
except Exception as e:
if ctx:
await ctx.error(f"Failed to update trust: {e}")
return {"status": "error", "error": str(e)}
@mcp.tool()
async def bt_block(adapter: str, address: str, blocked: bool) -> dict[str, Any]:
async def bt_block(
adapter: str,
address: str,
blocked: bool,
ctx: Context | None = None,
) -> dict[str, Any]:
"""Block or unblock a device.
Blocked devices cannot connect to this adapter.
@ -286,15 +391,27 @@ def register_tools(mcp: FastMCP) -> None:
Updated device info
"""
client = await get_client()
action = "Blocking" if blocked else "Unblocking"
if ctx:
await ctx.info(f"{action} device {address}")
try:
await client.set_device_blocked(adapter, address, blocked)
device = await client.get_device(adapter, address)
if ctx:
await ctx.info(f"Device {address} {'blocked' if blocked else 'unblocked'}")
return {"status": "updated", "device": asdict(device) if device else None}
except Exception as e:
if ctx:
await ctx.error(f"Failed to update block status: {e}")
return {"status": "error", "error": str(e)}
@mcp.tool()
async def bt_device_set_alias(adapter: str, address: str, alias: str) -> dict[str, Any]:
async def bt_device_set_alias(
adapter: str,
address: str,
alias: str,
ctx: Context | None = None,
) -> dict[str, Any]:
"""Set a friendly name for a device.
Args:
@ -306,9 +423,15 @@ def register_tools(mcp: FastMCP) -> None:
Updated device info
"""
client = await get_client()
if ctx:
await ctx.info(f"Setting alias for {address} to '{alias}'")
try:
await client.set_device_alias(adapter, address, alias)
device = await client.get_device(adapter, address)
if ctx:
await ctx.info(f"Alias set for {address}")
return {"status": "updated", "device": asdict(device) if device else None}
except Exception as e:
if ctx:
await ctx.error(f"Failed to set alias: {e}")
return {"status": "error", "error": str(e)}