Initial implementation of mcbluetooth MCP server

Comprehensive BlueZ MCP server exposing Linux Bluetooth stack to LLMs:

- Adapter management (list, power, discoverable, pairable)
- Device discovery and management (scan, pair, connect, trust)
- Audio profiles with PipeWire/PulseAudio integration (A2DP/HFP switching)
- BLE/GATT support (services, characteristics, read/write/notify)

Built on:
- FastMCP 2.14.4 for MCP protocol
- dbus-fast 4.0.0 for async BlueZ D-Bus communication
- pulsectl-asyncio 1.2.2 for audio control
This commit is contained in:
Ryan Malloy 2026-02-02 02:03:49 -07:00
commit 013cd0eb2f
14 changed files with 2426 additions and 0 deletions

46
.gitignore vendored Normal file
View File

@ -0,0 +1,46 @@
# Python
__pycache__/
*.py[cod]
*$py.class
*.so
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
*.egg-info/
.installed.cfg
*.egg
# Virtual environments
.venv/
venv/
ENV/
# IDE
.idea/
.vscode/
*.swp
*.swo
# Testing
.pytest_cache/
.coverage
htmlcov/
.tox/
.nox/
# uv
.uv/
uv.lock
# Project specific
*.log

154
README.md Normal file
View File

@ -0,0 +1,154 @@
# mcbluetooth
A comprehensive MCP server exposing the Linux Bluetooth stack (BlueZ) to LLMs.
## Features
### Adapter Management
- List, power, and configure Bluetooth adapters
- Control discoverable and pairable states
- Set adapter aliases
### Device Management
- Classic Bluetooth and BLE scanning with filters
- Pairing with multi-modal support (elicit, interactive, auto)
- Connect/disconnect/trust/block devices
- View device properties including RSSI, UUIDs, manufacturer data
### Audio Profiles (A2DP/HFP)
- List audio devices (sinks, sources, cards)
- Connect/disconnect audio profiles
- Switch between A2DP (high quality) and HFP (calls with mic)
- Volume control and muting
- Set default audio device (PipeWire/PulseAudio integration)
### Bluetooth Low Energy (BLE)
- BLE-specific scanning with name/service filters
- GATT service discovery
- Read/write characteristics
- Enable/disable notifications
- Battery level reading (standard Battery Service)
## Installation
```bash
# Install with uv (recommended)
uvx mcbluetooth
# Or install from source
uv pip install -e .
```
## Usage with Claude Code
```bash
# Add to Claude Code (from source)
claude mcp add mcbluetooth-local -- uv run --directory /path/to/mcbluetooth mcbluetooth
# Or if published to PyPI
claude mcp add mcbluetooth -- uvx mcbluetooth
```
## Requirements
- Linux with BlueZ 5.x
- Python 3.11+
- PipeWire or PulseAudio (for audio features)
- User must be in `bluetooth` group or have polkit permissions
### Permissions
```bash
# Add user to bluetooth group
sudo usermod -aG bluetooth $USER
# Or configure polkit for BlueZ D-Bus access
```
## MCP Tools
### Adapter Tools
| Tool | Description |
|------|-------------|
| `bt_list_adapters` | List all Bluetooth adapters |
| `bt_adapter_info` | Get adapter details |
| `bt_adapter_power` | Power on/off |
| `bt_adapter_discoverable` | Set visible to other devices |
| `bt_adapter_pairable` | Enable/disable pairing acceptance |
| `bt_adapter_set_alias` | Set friendly name |
### Device Tools
| Tool | Description |
|------|-------------|
| `bt_scan` | Scan for devices (classic/BLE/both) |
| `bt_list_devices` | List known devices with filters |
| `bt_device_info` | Get device details |
| `bt_pair` | Initiate pairing |
| `bt_pair_confirm` | Confirm/reject pairing |
| `bt_unpair` | Remove device |
| `bt_connect` | Connect to paired device |
| `bt_disconnect` | Disconnect device |
| `bt_trust` | Trust/untrust device |
| `bt_block` | Block/unblock device |
### Audio Tools
| Tool | Description |
|------|-------------|
| `bt_audio_list` | List all audio devices |
| `bt_audio_connect` | Connect audio profiles |
| `bt_audio_disconnect` | Disconnect audio |
| `bt_audio_set_profile` | Switch A2DP/HFP/off |
| `bt_audio_set_default` | Set as default sink |
| `bt_audio_volume` | Set volume (0-150) |
| `bt_audio_mute` | Mute/unmute |
### BLE Tools
| Tool | Description |
|------|-------------|
| `bt_ble_scan` | BLE scan with filters |
| `bt_ble_services` | List GATT services |
| `bt_ble_characteristics` | List characteristics |
| `bt_ble_read` | Read characteristic value |
| `bt_ble_write` | Write characteristic value |
| `bt_ble_notify` | Enable/disable notifications |
| `bt_ble_battery` | Read battery level |
## Example Prompts
```
# Discover and connect headphones
"Scan for Bluetooth devices and connect to my Sony headphones"
# Switch audio profile for calls
"Switch my headphones to HFP mode for a phone call"
# Read from a fitness tracker
"Connect to my fitness band and read the battery level"
# Set up audio output
"List all Bluetooth audio devices and set my speaker as the default"
```
## Architecture
```
┌─────────────────────────────────────────────┐
│ FastMCP Server │
├─────────────────────────────────────────────┤
│ Tool Categories │
│ ┌─────────┬─────────┬─────────┬─────────┐ │
│ │ Adapter │ Device │ Audio │ BLE │ │
│ │ Tools │ Tools │ Tools │ Tools │ │
│ └─────────┴─────────┴─────────┴─────────┘ │
├─────────────────────────────────────────────┤
│ BlueZ D-Bus Client Layer │
│ (dbus-fast) │
├─────────────────────────────────────────────┤
│ PipeWire/PulseAudio Integration │
│ (pulsectl-asyncio) │
└─────────────────────────────────────────────┘
```
## License
MIT

61
pyproject.toml Normal file
View File

@ -0,0 +1,61 @@
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[project]
name = "mcbluetooth"
version = "2026.02.02"
description = "Comprehensive BlueZ MCP server - expose the full Linux Bluetooth stack to LLMs"
readme = "README.md"
requires-python = ">=3.11"
license = "MIT"
authors = [
{name = "Ryan Malloy", email = "ryan@supported.systems"}
]
keywords = ["bluetooth", "bluez", "mcp", "fastmcp", "dbus", "ble", "gatt"]
classifiers = [
"Development Status :: 4 - Beta",
"Environment :: Console",
"Intended Audience :: Developers",
"License :: OSI Approved :: MIT License",
"Operating System :: POSIX :: Linux",
"Programming Language :: Python :: 3.11",
"Programming Language :: Python :: 3.12",
"Programming Language :: Python :: 3.13",
"Topic :: System :: Hardware",
"Topic :: Communications",
]
dependencies = [
"fastmcp>=2.14.4",
"dbus-fast>=4.0.0",
"pulsectl-asyncio>=1.2.2",
]
[project.optional-dependencies]
dev = [
"pytest>=8.0",
"pytest-asyncio>=0.24",
"ruff>=0.8",
]
[project.scripts]
mcbluetooth = "mcbluetooth:main"
[project.urls]
Repository = "https://github.com/ryanmalloy/mcbluetooth"
[tool.hatch.build.targets.wheel]
packages = ["src/mcbluetooth"]
[tool.ruff]
line-length = 100
target-version = "py311"
[tool.ruff.lint]
select = ["E", "F", "W", "I", "B", "UP"]
ignore = ["E501"]
[tool.pytest.ini_options]
asyncio_mode = "auto"
testpaths = ["tests"]

View File

@ -0,0 +1,5 @@
"""mcbluetooth - Comprehensive BlueZ MCP server for Linux Bluetooth management."""
from mcbluetooth.server import main, mcp
__all__ = ["mcp", "main"]

300
src/mcbluetooth/audio.py Normal file
View File

@ -0,0 +1,300 @@
"""PipeWire/PulseAudio integration for Bluetooth audio control."""
from __future__ import annotations
from dataclasses import dataclass, field
import pulsectl_asyncio
@dataclass
class AudioSink:
"""Information about an audio sink (output device)."""
name: str
index: int
description: str
muted: bool
volume_percent: int
is_bluetooth: bool
bluetooth_address: str | None
state: str # "running", "idle", "suspended"
properties: dict[str, str] = field(default_factory=dict)
@dataclass
class AudioSource:
"""Information about an audio source (input device)."""
name: str
index: int
description: str
muted: bool
volume_percent: int
is_bluetooth: bool
bluetooth_address: str | None
state: str
properties: dict[str, str] = field(default_factory=dict)
@dataclass
class AudioCard:
"""Information about an audio card (device with profiles)."""
name: str
index: int
driver: str
active_profile: str
profiles: list[str]
is_bluetooth: bool
bluetooth_address: str | None
properties: dict[str, str] = field(default_factory=dict)
def _extract_bt_address(props: dict[str, str]) -> str | None:
"""Extract Bluetooth address from PulseAudio properties."""
# Try various property names used by different versions
for key in ["device.string", "api.bluez5.address", "bluez.path"]:
if key in props:
value = props[key]
# Check if it looks like a BT address
if len(value) == 17 and value.count(":") == 5:
return value.upper()
# Extract from path like /org/bluez/hci0/dev_XX_XX_XX_XX_XX_XX
if "dev_" in value:
parts = value.split("dev_")
if len(parts) > 1:
addr = parts[1].split("/")[0].replace("_", ":")
if len(addr) == 17:
return addr.upper()
return None
def _is_bluetooth(props: dict[str, str]) -> bool:
"""Check if device is Bluetooth based on properties."""
# Check various indicators
if "api.bluez5.address" in props:
return True
if "bluez" in props.get("device.api", "").lower():
return True
if "bluetooth" in props.get("device.bus", "").lower():
return True
if props.get("device.string", "").startswith("/org/bluez"):
return True
return False
class PulseAudioClient:
"""Async client for PulseAudio/PipeWire audio control."""
def __init__(self):
self._pulse: pulsectl_asyncio.PulseAsync | None = None
async def connect(self) -> None:
"""Connect to PulseAudio/PipeWire server."""
if self._pulse is not None:
return
self._pulse = pulsectl_asyncio.PulseAsync("mcbluetooth")
await self._pulse.connect()
async def disconnect(self) -> None:
"""Disconnect from audio server."""
if self._pulse:
self._pulse.close()
self._pulse = None
async def _ensure_connected(self) -> None:
"""Ensure we're connected."""
if self._pulse is None:
await self.connect()
async def list_sinks(self) -> list[AudioSink]:
"""List all audio sinks (output devices)."""
await self._ensure_connected()
assert self._pulse is not None
sinks = []
for sink in await self._pulse.sink_list():
props = dict(sink.proplist)
is_bt = _is_bluetooth(props)
bt_addr = _extract_bt_address(props) if is_bt else None
# Calculate average volume percentage
vol_pct = int(sink.volume.value_flat * 100) if sink.volume else 0
sinks.append(
AudioSink(
name=sink.name,
index=sink.index,
description=sink.description or sink.name,
muted=sink.mute == 1,
volume_percent=vol_pct,
is_bluetooth=is_bt,
bluetooth_address=bt_addr,
state=sink.state.name.lower() if hasattr(sink.state, "name") else str(sink.state),
properties=props,
)
)
return sinks
async def list_sources(self) -> list[AudioSource]:
"""List all audio sources (input devices)."""
await self._ensure_connected()
assert self._pulse is not None
sources = []
for source in await self._pulse.source_list():
# Skip monitor sources
if ".monitor" in source.name:
continue
props = dict(source.proplist)
is_bt = _is_bluetooth(props)
bt_addr = _extract_bt_address(props) if is_bt else None
vol_pct = int(source.volume.value_flat * 100) if source.volume else 0
sources.append(
AudioSource(
name=source.name,
index=source.index,
description=source.description or source.name,
muted=source.mute == 1,
volume_percent=vol_pct,
is_bluetooth=is_bt,
bluetooth_address=bt_addr,
state=source.state.name.lower() if hasattr(source.state, "name") else str(source.state),
properties=props,
)
)
return sources
async def list_cards(self) -> list[AudioCard]:
"""List all audio cards (devices with profiles)."""
await self._ensure_connected()
assert self._pulse is not None
cards = []
for card in await self._pulse.card_list():
props = dict(card.proplist)
is_bt = _is_bluetooth(props)
bt_addr = _extract_bt_address(props) if is_bt else None
profile_names = [p.name for p in card.profile_list]
active_profile = card.profile_active.name if card.profile_active else ""
cards.append(
AudioCard(
name=card.name,
index=card.index,
driver=card.driver,
active_profile=active_profile,
profiles=profile_names,
is_bluetooth=is_bt,
bluetooth_address=bt_addr,
properties=props,
)
)
return cards
async def get_default_sink(self) -> str | None:
"""Get the name of the default sink."""
await self._ensure_connected()
assert self._pulse is not None
info = await self._pulse.server_info()
return info.default_sink_name
async def get_default_source(self) -> str | None:
"""Get the name of the default source."""
await self._ensure_connected()
assert self._pulse is not None
info = await self._pulse.server_info()
return info.default_source_name
async def set_default_sink(self, sink_name: str) -> None:
"""Set the default audio sink."""
await self._ensure_connected()
assert self._pulse is not None
await self._pulse.sink_default_set(sink_name)
async def set_default_source(self, source_name: str) -> None:
"""Set the default audio source."""
await self._ensure_connected()
assert self._pulse is not None
await self._pulse.source_default_set(source_name)
async def set_sink_volume(self, sink_name: str, volume_percent: int) -> None:
"""Set sink volume (0-100)."""
await self._ensure_connected()
assert self._pulse is not None
volume_percent = max(0, min(150, volume_percent)) # Allow up to 150%
vol = volume_percent / 100.0
# Find the sink
for sink in await self._pulse.sink_list():
if sink.name == sink_name:
await self._pulse.volume_set_all_chans(sink, vol)
return
raise ValueError(f"Sink not found: {sink_name}")
async def set_sink_mute(self, sink_name: str, muted: bool) -> None:
"""Mute or unmute a sink."""
await self._ensure_connected()
assert self._pulse is not None
for sink in await self._pulse.sink_list():
if sink.name == sink_name:
await self._pulse.sink_mute(sink.index, muted)
return
raise ValueError(f"Sink not found: {sink_name}")
async def set_card_profile(self, card_name: str, profile_name: str) -> None:
"""Set the active profile for a card.
For Bluetooth devices, common profiles include:
- a2dp-sink: High quality audio output (A2DP)
- headset-head-unit: Lower quality but with microphone (HFP/HSP)
- off: Disable the device
"""
await self._ensure_connected()
assert self._pulse is not None
for card in await self._pulse.card_list():
if card.name == card_name:
await self._pulse.card_profile_set(card, profile_name)
return
raise ValueError(f"Card not found: {card_name}")
async def find_bluetooth_sink(self, address: str) -> AudioSink | None:
"""Find a sink by Bluetooth address."""
address = address.upper()
for sink in await self.list_sinks():
if sink.bluetooth_address == address:
return sink
return None
async def find_bluetooth_card(self, address: str) -> AudioCard | None:
"""Find a card by Bluetooth address."""
address = address.upper()
for card in await self.list_cards():
if card.bluetooth_address == address:
return card
return None
# Global client instance
_audio_client: PulseAudioClient | None = None
async def get_audio_client() -> PulseAudioClient:
"""Get or create the global audio client."""
global _audio_client
if _audio_client is None:
_audio_client = PulseAudioClient()
await _audio_client.connect()
return _audio_client

View File

@ -0,0 +1,589 @@
"""BlueZ D-Bus client wrapper using dbus-fast.
This module provides an async interface to the BlueZ Bluetooth stack via D-Bus.
BlueZ exposes its functionality through several D-Bus interfaces:
- org.bluez.Adapter1: Adapter control (power, discoverable, scanning)
- org.bluez.Device1: Device management (pair, connect, trust)
- org.bluez.GattService1: BLE GATT services
- org.bluez.GattCharacteristic1: BLE GATT characteristics
- org.bluez.Agent1: Pairing agent for PIN/confirmation handling
- org.bluez.AgentManager1: Agent registration
Object paths follow this pattern:
- /org/bluez/hci0 - Adapter
- /org/bluez/hci0/dev_XX_XX_XX_XX_XX_XX - Device
- /org/bluez/hci0/dev_XX_XX_XX_XX_XX_XX/serviceXXXX - GATT service
"""
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Any
from dbus_fast import BusType, Variant
from dbus_fast.aio import MessageBus, ProxyInterface
# BlueZ constants
BLUEZ_SERVICE = "org.bluez"
BLUEZ_ADAPTER_IFACE = "org.bluez.Adapter1"
BLUEZ_DEVICE_IFACE = "org.bluez.Device1"
BLUEZ_GATT_SERVICE_IFACE = "org.bluez.GattService1"
BLUEZ_GATT_CHAR_IFACE = "org.bluez.GattCharacteristic1"
BLUEZ_GATT_DESC_IFACE = "org.bluez.GattDescriptor1"
BLUEZ_BATTERY_IFACE = "org.bluez.Battery1"
BLUEZ_AGENT_IFACE = "org.bluez.Agent1"
BLUEZ_AGENT_MANAGER_IFACE = "org.bluez.AgentManager1"
DBUS_PROPS_IFACE = "org.freedesktop.DBus.Properties"
DBUS_OBJMANAGER_IFACE = "org.freedesktop.DBus.ObjectManager"
def address_to_path(adapter: str, address: str) -> str:
"""Convert a Bluetooth address to D-Bus object path."""
addr_path = address.upper().replace(":", "_")
return f"/org/bluez/{adapter}/dev_{addr_path}"
def path_to_address(path: str) -> str:
"""Extract Bluetooth address from D-Bus object path."""
# /org/bluez/hci0/dev_XX_XX_XX_XX_XX_XX -> XX:XX:XX:XX:XX:XX
parts = path.split("/")
if len(parts) >= 5 and parts[-1].startswith("dev_"):
return parts[-1][4:].replace("_", ":")
return ""
@dataclass
class AdapterInfo:
"""Information about a Bluetooth adapter."""
path: str
name: str # e.g., "hci0"
address: str
alias: str
powered: bool
discoverable: bool
discoverable_timeout: int
pairable: bool
pairable_timeout: int
discovering: bool
uuids: list[str] = field(default_factory=list)
modalias: str = ""
device_class: int = 0
@dataclass
class DeviceInfo:
"""Information about a Bluetooth device."""
path: str
adapter: str
address: str
name: str
alias: str
paired: bool
bonded: bool
trusted: bool
blocked: bool
connected: bool
rssi: int | None
tx_power: int | None
uuids: list[str] = field(default_factory=list)
device_class: int = 0
appearance: int = 0
icon: str = ""
services_resolved: bool = False
manufacturer_data: dict[int, bytes] = field(default_factory=dict)
service_data: dict[str, bytes] = field(default_factory=dict)
@dataclass
class GattService:
"""Information about a GATT service."""
path: str
uuid: str
primary: bool
device_path: str
@dataclass
class GattCharacteristic:
"""Information about a GATT characteristic."""
path: str
uuid: str
service_path: str
flags: list[str] = field(default_factory=list)
notifying: bool = False
class BlueZClient:
"""Async client for BlueZ D-Bus API."""
def __init__(self):
self._bus: MessageBus | None = None
self._object_manager: ProxyInterface | None = None
self._objects_cache: dict[str, dict[str, dict[str, Any]]] = {}
async def connect(self) -> None:
"""Connect to the system D-Bus."""
if self._bus is not None:
return
self._bus = await MessageBus(bus_type=BusType.SYSTEM).connect()
# Get the ObjectManager for BlueZ
introspection = await self._bus.introspect(BLUEZ_SERVICE, "/")
proxy = self._bus.get_proxy_object(BLUEZ_SERVICE, "/", introspection)
self._object_manager = proxy.get_interface(DBUS_OBJMANAGER_IFACE)
async def disconnect(self) -> None:
"""Disconnect from D-Bus."""
if self._bus:
self._bus.disconnect()
self._bus = None
self._object_manager = None
async def _ensure_connected(self) -> None:
"""Ensure we're connected to D-Bus."""
if self._bus is None:
await self.connect()
async def _get_managed_objects(self) -> dict[str, dict[str, dict[str, Any]]]:
"""Get all managed objects from BlueZ."""
await self._ensure_connected()
assert self._object_manager is not None
objects = await self._object_manager.call_get_managed_objects()
# Convert Variant values to Python types
result = {}
for path, interfaces in objects.items():
result[path] = {}
for iface, props in interfaces.items():
result[path][iface] = {
k: v.value if isinstance(v, Variant) else v for k, v in props.items()
}
return result
async def _get_interface(self, path: str, interface: str) -> ProxyInterface:
"""Get a proxy interface for an object."""
await self._ensure_connected()
assert self._bus is not None
introspection = await self._bus.introspect(BLUEZ_SERVICE, path)
proxy = self._bus.get_proxy_object(BLUEZ_SERVICE, path, introspection)
return proxy.get_interface(interface)
async def _get_property(self, path: str, interface: str, prop: str) -> Any:
"""Get a single property from an object."""
props_iface = await self._get_interface(path, DBUS_PROPS_IFACE)
result = await props_iface.call_get(interface, prop)
return result.value if isinstance(result, Variant) else result
async def _set_property(self, path: str, interface: str, prop: str, value: Any) -> None:
"""Set a single property on an object."""
props_iface = await self._get_interface(path, DBUS_PROPS_IFACE)
# Wrap value in appropriate Variant
if isinstance(value, bool):
variant = Variant("b", value)
elif isinstance(value, int):
variant = Variant("u", value)
elif isinstance(value, str):
variant = Variant("s", value)
else:
variant = Variant("v", value)
await props_iface.call_set(interface, prop, variant)
async def _get_all_properties(self, path: str, interface: str) -> dict[str, Any]:
"""Get all properties from an object."""
props_iface = await self._get_interface(path, DBUS_PROPS_IFACE)
props = await props_iface.call_get_all(interface)
return {k: v.value if isinstance(v, Variant) else v for k, v in props.items()}
# ==================== Adapter Operations ====================
async def list_adapters(self) -> list[AdapterInfo]:
"""List all Bluetooth adapters."""
objects = await self._get_managed_objects()
adapters = []
for path, interfaces in objects.items():
if BLUEZ_ADAPTER_IFACE in interfaces:
props = interfaces[BLUEZ_ADAPTER_IFACE]
# Extract adapter name from path: /org/bluez/hci0 -> hci0
name = path.split("/")[-1]
adapters.append(
AdapterInfo(
path=path,
name=name,
address=props.get("Address", ""),
alias=props.get("Alias", ""),
powered=props.get("Powered", False),
discoverable=props.get("Discoverable", False),
discoverable_timeout=props.get("DiscoverableTimeout", 0),
pairable=props.get("Pairable", False),
pairable_timeout=props.get("PairableTimeout", 0),
discovering=props.get("Discovering", False),
uuids=props.get("UUIDs", []),
modalias=props.get("Modalias", ""),
device_class=props.get("Class", 0),
)
)
return adapters
async def get_adapter(self, adapter: str) -> AdapterInfo | None:
"""Get info for a specific adapter."""
adapters = await self.list_adapters()
for a in adapters:
if a.name == adapter:
return a
return None
async def set_adapter_power(self, adapter: str, powered: bool) -> None:
"""Set adapter power state."""
path = f"/org/bluez/{adapter}"
await self._set_property(path, BLUEZ_ADAPTER_IFACE, "Powered", powered)
async def set_adapter_discoverable(
self, adapter: str, discoverable: bool, timeout: int = 0
) -> None:
"""Set adapter discoverable state."""
path = f"/org/bluez/{adapter}"
if timeout > 0:
await self._set_property(
path, BLUEZ_ADAPTER_IFACE, "DiscoverableTimeout", timeout
)
await self._set_property(path, BLUEZ_ADAPTER_IFACE, "Discoverable", discoverable)
async def set_adapter_pairable(self, adapter: str, pairable: bool, timeout: int = 0) -> None:
"""Set adapter pairable state."""
path = f"/org/bluez/{adapter}"
if timeout > 0:
await self._set_property(path, BLUEZ_ADAPTER_IFACE, "PairableTimeout", timeout)
await self._set_property(path, BLUEZ_ADAPTER_IFACE, "Pairable", pairable)
async def set_adapter_alias(self, adapter: str, alias: str) -> None:
"""Set adapter friendly name."""
path = f"/org/bluez/{adapter}"
await self._set_property(path, BLUEZ_ADAPTER_IFACE, "Alias", alias)
# ==================== Discovery Operations ====================
async def start_discovery(self, adapter: str) -> None:
"""Start device discovery on an adapter."""
path = f"/org/bluez/{adapter}"
iface = await self._get_interface(path, BLUEZ_ADAPTER_IFACE)
await iface.call_start_discovery()
async def stop_discovery(self, adapter: str) -> None:
"""Stop device discovery on an adapter."""
path = f"/org/bluez/{adapter}"
iface = await self._get_interface(path, BLUEZ_ADAPTER_IFACE)
await iface.call_stop_discovery()
async def set_discovery_filter(
self,
adapter: str,
uuids: list[str] | None = None,
rssi: int | None = None,
transport: str = "auto",
duplicate_data: bool = False,
) -> None:
"""Set discovery filter for BLE scanning.
Args:
adapter: Adapter name (e.g., "hci0")
uuids: List of service UUIDs to filter by
rssi: Minimum RSSI threshold
transport: "auto", "bredr", or "le"
duplicate_data: Report duplicate advertisements
"""
path = f"/org/bluez/{adapter}"
iface = await self._get_interface(path, BLUEZ_ADAPTER_IFACE)
filter_dict: dict[str, Variant] = {}
if uuids:
filter_dict["UUIDs"] = Variant("as", uuids)
if rssi is not None:
filter_dict["RSSI"] = Variant("n", rssi)
filter_dict["Transport"] = Variant("s", transport)
filter_dict["DuplicateData"] = Variant("b", duplicate_data)
await iface.call_set_discovery_filter(filter_dict)
async def remove_discovery_filter(self, adapter: str) -> None:
"""Remove discovery filter."""
path = f"/org/bluez/{adapter}"
iface = await self._get_interface(path, BLUEZ_ADAPTER_IFACE)
# Set empty filter to clear
await iface.call_set_discovery_filter({})
# ==================== Device Operations ====================
async def list_devices(
self,
adapter: str | None = None,
filter_type: str = "all",
) -> list[DeviceInfo]:
"""List devices, optionally filtered.
Args:
adapter: Filter to devices on this adapter, or None for all
filter_type: "all", "paired", "connected", or "trusted"
"""
objects = await self._get_managed_objects()
devices = []
for path, interfaces in objects.items():
if BLUEZ_DEVICE_IFACE not in interfaces:
continue
# Filter by adapter
if adapter and not path.startswith(f"/org/bluez/{adapter}/"):
continue
props = interfaces[BLUEZ_DEVICE_IFACE]
# Apply filter
if filter_type == "paired" and not props.get("Paired", False):
continue
if filter_type == "connected" and not props.get("Connected", False):
continue
if filter_type == "trusted" and not props.get("Trusted", False):
continue
# Extract adapter name from path
adapter_name = path.split("/")[3] if len(path.split("/")) > 3 else ""
# Parse manufacturer data
mfr_data = {}
if "ManufacturerData" in props:
for k, v in props["ManufacturerData"].items():
mfr_data[k] = bytes(v) if isinstance(v, (list, bytearray)) else v
# Parse service data
svc_data = {}
if "ServiceData" in props:
for k, v in props["ServiceData"].items():
svc_data[k] = bytes(v) if isinstance(v, (list, bytearray)) else v
devices.append(
DeviceInfo(
path=path,
adapter=adapter_name,
address=props.get("Address", ""),
name=props.get("Name", ""),
alias=props.get("Alias", ""),
paired=props.get("Paired", False),
bonded=props.get("Bonded", False),
trusted=props.get("Trusted", False),
blocked=props.get("Blocked", False),
connected=props.get("Connected", False),
rssi=props.get("RSSI"),
tx_power=props.get("TxPower"),
uuids=props.get("UUIDs", []),
device_class=props.get("Class", 0),
appearance=props.get("Appearance", 0),
icon=props.get("Icon", ""),
services_resolved=props.get("ServicesResolved", False),
manufacturer_data=mfr_data,
service_data=svc_data,
)
)
return devices
async def get_device(self, adapter: str, address: str) -> DeviceInfo | None:
"""Get info for a specific device."""
devices = await self.list_devices(adapter=adapter)
for d in devices:
if d.address.upper() == address.upper():
return d
return None
async def pair_device(self, adapter: str, address: str) -> None:
"""Initiate pairing with a device.
Note: This may require an agent for PIN entry or confirmation.
"""
path = address_to_path(adapter, address)
iface = await self._get_interface(path, BLUEZ_DEVICE_IFACE)
await iface.call_pair()
async def cancel_pairing(self, adapter: str, address: str) -> None:
"""Cancel ongoing pairing."""
path = address_to_path(adapter, address)
iface = await self._get_interface(path, BLUEZ_DEVICE_IFACE)
await iface.call_cancel_pairing()
async def connect_device(self, adapter: str, address: str) -> None:
"""Connect to a device."""
path = address_to_path(adapter, address)
iface = await self._get_interface(path, BLUEZ_DEVICE_IFACE)
await iface.call_connect()
async def disconnect_device(self, adapter: str, address: str) -> None:
"""Disconnect from a device."""
path = address_to_path(adapter, address)
iface = await self._get_interface(path, BLUEZ_DEVICE_IFACE)
await iface.call_disconnect()
async def remove_device(self, adapter: str, address: str) -> None:
"""Remove a device from the known devices list."""
device_path = address_to_path(adapter, address)
adapter_path = f"/org/bluez/{adapter}"
iface = await self._get_interface(adapter_path, BLUEZ_ADAPTER_IFACE)
await iface.call_remove_device(device_path)
async def set_device_trusted(self, adapter: str, address: str, trusted: bool) -> None:
"""Set device trusted state."""
path = address_to_path(adapter, address)
await self._set_property(path, BLUEZ_DEVICE_IFACE, "Trusted", trusted)
async def set_device_blocked(self, adapter: str, address: str, blocked: bool) -> None:
"""Set device blocked state."""
path = address_to_path(adapter, address)
await self._set_property(path, BLUEZ_DEVICE_IFACE, "Blocked", blocked)
async def set_device_alias(self, adapter: str, address: str, alias: str) -> None:
"""Set device friendly name."""
path = address_to_path(adapter, address)
await self._set_property(path, BLUEZ_DEVICE_IFACE, "Alias", alias)
# ==================== Profile Operations ====================
async def connect_profile(self, adapter: str, address: str, uuid: str) -> None:
"""Connect a specific profile."""
path = address_to_path(adapter, address)
iface = await self._get_interface(path, BLUEZ_DEVICE_IFACE)
await iface.call_connect_profile(uuid)
async def disconnect_profile(self, adapter: str, address: str, uuid: str) -> None:
"""Disconnect a specific profile."""
path = address_to_path(adapter, address)
iface = await self._get_interface(path, BLUEZ_DEVICE_IFACE)
await iface.call_disconnect_profile(uuid)
# ==================== GATT Operations ====================
async def list_gatt_services(self, adapter: str, address: str) -> list[GattService]:
"""List GATT services for a device."""
device_path = address_to_path(adapter, address)
objects = await self._get_managed_objects()
services = []
for path, interfaces in objects.items():
if not path.startswith(device_path + "/"):
continue
if BLUEZ_GATT_SERVICE_IFACE not in interfaces:
continue
props = interfaces[BLUEZ_GATT_SERVICE_IFACE]
services.append(
GattService(
path=path,
uuid=props.get("UUID", ""),
primary=props.get("Primary", True),
device_path=props.get("Device", device_path),
)
)
return services
async def list_gatt_characteristics(
self, adapter: str, address: str, service_uuid: str | None = None
) -> list[GattCharacteristic]:
"""List GATT characteristics for a device, optionally filtered by service."""
device_path = address_to_path(adapter, address)
objects = await self._get_managed_objects()
# First find services if filtering
service_paths = set()
if service_uuid:
for path, interfaces in objects.items():
if BLUEZ_GATT_SERVICE_IFACE in interfaces:
if interfaces[BLUEZ_GATT_SERVICE_IFACE].get("UUID", "").lower() == service_uuid.lower():
service_paths.add(path)
characteristics = []
for path, interfaces in objects.items():
if not path.startswith(device_path + "/"):
continue
if BLUEZ_GATT_CHAR_IFACE not in interfaces:
continue
props = interfaces[BLUEZ_GATT_CHAR_IFACE]
service_path = props.get("Service", "")
# Filter by service if specified
if service_uuid and service_path not in service_paths:
continue
characteristics.append(
GattCharacteristic(
path=path,
uuid=props.get("UUID", ""),
service_path=service_path,
flags=props.get("Flags", []),
notifying=props.get("Notifying", False),
)
)
return characteristics
async def read_characteristic(self, char_path: str) -> bytes:
"""Read a GATT characteristic value."""
iface = await self._get_interface(char_path, BLUEZ_GATT_CHAR_IFACE)
result = await iface.call_read_value({})
return bytes(result)
async def write_characteristic(
self, char_path: str, value: bytes, write_type: str = "request"
) -> None:
"""Write a GATT characteristic value.
Args:
char_path: D-Bus path of the characteristic
value: Bytes to write
write_type: "request" (with response) or "command" (without response)
"""
iface = await self._get_interface(char_path, BLUEZ_GATT_CHAR_IFACE)
options = {"type": Variant("s", write_type)}
await iface.call_write_value(list(value), options)
async def start_notify(self, char_path: str) -> None:
"""Start notifications for a characteristic."""
iface = await self._get_interface(char_path, BLUEZ_GATT_CHAR_IFACE)
await iface.call_start_notify()
async def stop_notify(self, char_path: str) -> None:
"""Stop notifications for a characteristic."""
iface = await self._get_interface(char_path, BLUEZ_GATT_CHAR_IFACE)
await iface.call_stop_notify()
# ==================== Battery Operations ====================
async def get_battery_level(self, adapter: str, address: str) -> int | None:
"""Get battery level from Battery1 interface if available."""
path = address_to_path(adapter, address)
objects = await self._get_managed_objects()
if path in objects and BLUEZ_BATTERY_IFACE in objects[path]:
return objects[path][BLUEZ_BATTERY_IFACE].get("Percentage")
return None
# Global client instance
_client: BlueZClient | None = None
async def get_client() -> BlueZClient:
"""Get or create the global BlueZ client."""
global _client
if _client is None:
_client = BlueZClient()
await _client.connect()
return _client

51
src/mcbluetooth/server.py Normal file
View File

@ -0,0 +1,51 @@
"""FastMCP server for BlueZ Bluetooth management."""
from fastmcp import FastMCP
from mcbluetooth.tools import adapter, audio, ble, device
mcp = FastMCP(
name="mcbluetooth",
instructions="""Bluetooth management server for Linux (BlueZ).
This server provides comprehensive control over the Linux Bluetooth stack:
- Adapter management (power, discoverable, pairable)
- Device discovery and management (scan, pair, connect)
- Audio profiles (A2DP, HFP) with PipeWire/PulseAudio integration
- BLE/GATT services (read/write characteristics, notifications)
All tools require an explicit 'adapter' parameter (e.g., "hci0").
Use bt_list_adapters() to discover available adapters.
For pairing, use pairing_mode parameter:
- "elicit": Use MCP elicitation to request PIN from user (preferred)
- "interactive": Return awaiting status, then call bt_pair_confirm
- "auto": Auto-accept pairings (use in trusted environments only)
""",
)
# Register tool modules
adapter.register_tools(mcp)
device.register_tools(mcp)
audio.register_tools(mcp)
ble.register_tools(mcp)
def main():
"""Entry point for the mcbluetooth MCP server."""
try:
from importlib.metadata import version
package_version = version("mcbluetooth")
except Exception:
package_version = "dev"
print(f"🔵 mcbluetooth v{package_version}")
print(" BlueZ MCP Server - Bluetooth management for LLMs")
print()
mcp.run()
if __name__ == "__main__":
main()

View File

@ -0,0 +1,5 @@
"""MCP tool modules for Bluetooth management."""
from mcbluetooth.tools import adapter, audio, ble, device
__all__ = ["adapter", "device", "audio", "ble"]

View File

@ -0,0 +1,117 @@
"""Adapter management tools for BlueZ."""
from __future__ import annotations
from dataclasses import asdict
from typing import Any
from fastmcp import FastMCP
from mcbluetooth.dbus_client import get_client
def register_tools(mcp: FastMCP) -> None:
"""Register adapter management tools with the MCP server."""
@mcp.tool()
async def bt_list_adapters() -> list[dict[str, Any]]:
"""List all Bluetooth adapters on the system.
Returns a list of adapters with their properties including:
- name: Adapter identifier (e.g., "hci0")
- address: Bluetooth MAC address
- powered: Whether adapter is on
- discoverable: Whether adapter is visible to other devices
- discovering: Whether currently scanning
"""
client = await get_client()
adapters = await client.list_adapters()
return [asdict(a) for a in adapters]
@mcp.tool()
async def bt_adapter_info(adapter: str) -> dict[str, Any] | None:
"""Get detailed information about a specific adapter.
Args:
adapter: Adapter name (e.g., "hci0")
Returns:
Adapter properties or None if not found
"""
client = await get_client()
info = await client.get_adapter(adapter)
return asdict(info) if info else None
@mcp.tool()
async def bt_adapter_power(adapter: str, on: bool) -> dict[str, Any]:
"""Power an adapter on or off.
Args:
adapter: Adapter name (e.g., "hci0")
on: True to power on, False to power off
Returns:
Updated adapter info
"""
client = await get_client()
await client.set_adapter_power(adapter, on)
info = await client.get_adapter(adapter)
return asdict(info) if info else {"error": "Adapter not found"}
@mcp.tool()
async def bt_adapter_discoverable(
adapter: str,
on: bool,
timeout: int = 180,
) -> dict[str, Any]:
"""Set adapter discoverable (visible to other devices).
Args:
adapter: Adapter name (e.g., "hci0")
on: True to enable discoverable, False to disable
timeout: Discoverable timeout in seconds (0 = forever, default 180)
Returns:
Updated adapter info
"""
client = await get_client()
await client.set_adapter_discoverable(adapter, on, timeout)
info = await client.get_adapter(adapter)
return asdict(info) if info else {"error": "Adapter not found"}
@mcp.tool()
async def bt_adapter_pairable(
adapter: str,
on: bool,
timeout: int = 0,
) -> dict[str, Any]:
"""Set adapter pairable state.
Args:
adapter: Adapter name (e.g., "hci0")
on: True to enable pairing acceptance, False to disable
timeout: Pairable timeout in seconds (0 = forever)
Returns:
Updated adapter info
"""
client = await get_client()
await client.set_adapter_pairable(adapter, on, timeout)
info = await client.get_adapter(adapter)
return asdict(info) if info else {"error": "Adapter not found"}
@mcp.tool()
async def bt_adapter_set_alias(adapter: str, alias: str) -> dict[str, Any]:
"""Set adapter friendly name (alias).
Args:
adapter: Adapter name (e.g., "hci0")
alias: New friendly name for the adapter
Returns:
Updated adapter info
"""
client = await get_client()
await client.set_adapter_alias(adapter, alias)
info = await client.get_adapter(adapter)
return asdict(info) if info else {"error": "Adapter not found"}

View File

@ -0,0 +1,287 @@
"""Audio profile management tools for Bluetooth devices."""
from __future__ import annotations
from dataclasses import asdict
from typing import Any, Literal
from fastmcp import FastMCP
from mcbluetooth.audio import get_audio_client
from mcbluetooth.dbus_client import get_client
# Common Bluetooth audio UUIDs
A2DP_SINK_UUID = "0000110b-0000-1000-8000-00805f9b34fb"
A2DP_SOURCE_UUID = "0000110a-0000-1000-8000-00805f9b34fb"
HFP_HF_UUID = "0000111e-0000-1000-8000-00805f9b34fb"
HFP_AG_UUID = "0000111f-0000-1000-8000-00805f9b34fb"
HSP_HS_UUID = "00001108-0000-1000-8000-00805f9b34fb"
HSP_AG_UUID = "00001112-0000-1000-8000-00805f9b34fb"
def register_tools(mcp: FastMCP) -> None:
"""Register audio management tools with the MCP server."""
@mcp.tool()
async def bt_audio_list() -> dict[str, Any]:
"""List all audio devices including Bluetooth devices.
Returns information about:
- Sinks (audio outputs like speakers, headphones)
- Sources (audio inputs like microphones)
- Cards (devices with switchable profiles)
Bluetooth audio devices show their MAC address.
"""
audio = await get_audio_client()
sinks = await audio.list_sinks()
sources = await audio.list_sources()
cards = await audio.list_cards()
default_sink = await audio.get_default_sink()
default_source = await audio.get_default_source()
return {
"default_sink": default_sink,
"default_source": default_source,
"sinks": [asdict(s) for s in sinks],
"sources": [asdict(s) for s in sources],
"cards": [asdict(c) for c in cards],
"bluetooth_sinks": [asdict(s) for s in sinks if s.is_bluetooth],
"bluetooth_cards": [asdict(c) for c in cards if c.is_bluetooth],
}
@mcp.tool()
async def bt_audio_connect(adapter: str, address: str) -> dict[str, Any]:
"""Connect audio profiles to a Bluetooth device.
This connects the A2DP (high-quality audio) and/or HFP (hands-free)
profiles to the specified Bluetooth device.
Args:
adapter: Adapter name (e.g., "hci0")
address: Device Bluetooth address
Returns:
Connection status and available audio info
"""
bt_client = await get_client()
# First ensure device is connected
device = await bt_client.get_device(adapter, address)
if not device:
return {"status": "error", "error": "Device not found"}
if not device.paired:
return {"status": "error", "error": "Device not paired. Pair first with bt_pair."}
if not device.connected:
try:
await bt_client.connect_device(adapter, address)
except Exception as e:
return {"status": "error", "error": f"Connection failed: {e}"}
# Try to connect audio profiles
errors = []
connected_profiles = []
for uuid, name in [(A2DP_SINK_UUID, "A2DP"), (HFP_HF_UUID, "HFP")]:
if uuid.lower() in [u.lower() for u in device.uuids]:
try:
await bt_client.connect_profile(adapter, address, uuid)
connected_profiles.append(name)
except Exception as e:
errors.append(f"{name}: {e}")
# Check audio system
audio = await get_audio_client()
sink = await audio.find_bluetooth_sink(address)
card = await audio.find_bluetooth_card(address)
return {
"status": "connected" if connected_profiles else "no_audio_profiles",
"connected_profiles": connected_profiles,
"errors": errors if errors else None,
"audio_sink": asdict(sink) if sink else None,
"audio_card": asdict(card) if card else None,
}
@mcp.tool()
async def bt_audio_disconnect(adapter: str, address: str) -> dict[str, Any]:
"""Disconnect audio profiles from a Bluetooth device.
This disconnects audio profiles but keeps the device connected
for other services (if any).
Args:
adapter: Adapter name (e.g., "hci0")
address: Device Bluetooth address
Returns:
Disconnection status
"""
bt_client = await get_client()
device = await bt_client.get_device(adapter, address)
if not device:
return {"status": "error", "error": "Device not found"}
errors = []
disconnected_profiles = []
for uuid, name in [(A2DP_SINK_UUID, "A2DP"), (HFP_HF_UUID, "HFP")]:
if uuid.lower() in [u.lower() for u in device.uuids]:
try:
await bt_client.disconnect_profile(adapter, address, uuid)
disconnected_profiles.append(name)
except Exception as e:
errors.append(f"{name}: {e}")
return {
"status": "disconnected" if disconnected_profiles else "no_audio_profiles",
"disconnected_profiles": disconnected_profiles,
"errors": errors if errors else None,
}
@mcp.tool()
async def bt_audio_set_profile(
address: str,
profile: Literal["a2dp", "hfp", "off"],
) -> dict[str, Any]:
"""Switch audio profile for a Bluetooth device.
Bluetooth audio devices support different profiles:
- a2dp: High-quality stereo audio (best for music)
- hfp: Hands-free profile with microphone (lower quality, for calls)
- off: Disable audio for this device
Args:
address: Device Bluetooth address
profile: Profile to activate
Returns:
Status and updated card info
"""
audio = await get_audio_client()
card = await audio.find_bluetooth_card(address)
if not card:
return {"status": "error", "error": "Bluetooth audio card not found"}
# Map profile names to PulseAudio profile names
# These vary by PipeWire/PulseAudio version
profile_map = {
"a2dp": ["a2dp-sink", "a2dp_sink", "a2dp-sink-aac", "a2dp-sink-sbc"],
"hfp": ["headset-head-unit", "headset_head_unit", "handsfree_head_unit"],
"off": ["off"],
}
target_profiles = profile_map[profile]
# Find a matching profile
matched_profile = None
for p in target_profiles:
if p in card.profiles:
matched_profile = p
break
if not matched_profile:
return {
"status": "error",
"error": f"Profile '{profile}' not available",
"available_profiles": card.profiles,
}
try:
await audio.set_card_profile(card.name, matched_profile)
# Refresh card info
card = await audio.find_bluetooth_card(address)
return {
"status": "profile_changed",
"profile": matched_profile,
"card": asdict(card) if card else None,
}
except Exception as e:
return {"status": "error", "error": str(e)}
@mcp.tool()
async def bt_audio_set_default(address: str) -> dict[str, Any]:
"""Set a Bluetooth device as the default audio output.
Args:
address: Device Bluetooth address
Returns:
Status and updated default sink info
"""
audio = await get_audio_client()
sink = await audio.find_bluetooth_sink(address)
if not sink:
return {"status": "error", "error": "Bluetooth audio sink not found"}
try:
await audio.set_default_sink(sink.name)
return {
"status": "default_set",
"sink_name": sink.name,
"description": sink.description,
}
except Exception as e:
return {"status": "error", "error": str(e)}
@mcp.tool()
async def bt_audio_volume(address: str, volume: int) -> dict[str, Any]:
"""Set volume for a Bluetooth audio device.
Args:
address: Device Bluetooth address
volume: Volume level 0-100 (can go up to 150 for boost)
Returns:
Status and updated volume info
"""
audio = await get_audio_client()
sink = await audio.find_bluetooth_sink(address)
if not sink:
return {"status": "error", "error": "Bluetooth audio sink not found"}
try:
await audio.set_sink_volume(sink.name, volume)
# Refresh sink info
sink = await audio.find_bluetooth_sink(address)
return {
"status": "volume_set",
"volume": sink.volume_percent if sink else volume,
"sink_name": sink.name if sink else None,
}
except Exception as e:
return {"status": "error", "error": str(e)}
@mcp.tool()
async def bt_audio_mute(address: str, muted: bool) -> dict[str, Any]:
"""Mute or unmute a Bluetooth audio device.
Args:
address: Device Bluetooth address
muted: True to mute, False to unmute
Returns:
Status and mute state
"""
audio = await get_audio_client()
sink = await audio.find_bluetooth_sink(address)
if not sink:
return {"status": "error", "error": "Bluetooth audio sink not found"}
try:
await audio.set_sink_mute(sink.name, muted)
return {
"status": "muted" if muted else "unmuted",
"sink_name": sink.name,
}
except Exception as e:
return {"status": "error", "error": str(e)}

View File

@ -0,0 +1,403 @@
"""Bluetooth Low Energy (BLE) and GATT tools."""
from __future__ import annotations
import asyncio
from typing import Any
from fastmcp import FastMCP
from mcbluetooth.dbus_client import get_client
# Common BLE service UUIDs
BATTERY_SERVICE_UUID = "0000180f-0000-1000-8000-00805f9b34fb"
BATTERY_LEVEL_CHAR_UUID = "00002a19-0000-1000-8000-00805f9b34fb"
DEVICE_INFO_SERVICE_UUID = "0000180a-0000-1000-8000-00805f9b34fb"
HEART_RATE_SERVICE_UUID = "0000180d-0000-1000-8000-00805f9b34fb"
HEART_RATE_MEASUREMENT_UUID = "00002a37-0000-1000-8000-00805f9b34fb"
def _format_uuid(uuid: str) -> str:
"""Format UUID for display - short form for standard UUIDs."""
# Standard Bluetooth UUIDs have the form 0000XXXX-0000-1000-8000-00805f9b34fb
if uuid.lower().endswith("-0000-1000-8000-00805f9b34fb"):
short = uuid[:8].lstrip("0") or "0"
return f"0x{short.upper()}"
return uuid
def register_tools(mcp: FastMCP) -> None:
"""Register BLE/GATT tools with the MCP server."""
@mcp.tool()
async def bt_ble_scan(
adapter: str,
timeout: int = 10,
name_filter: str | None = None,
service_filter: str | None = None,
) -> list[dict[str, Any]]:
"""Scan for BLE (Bluetooth Low Energy) devices.
BLE devices advertise their presence and can be filtered by name
or service UUID. Common services include:
- 0x180F: Battery Service
- 0x180D: Heart Rate Service
- 0x180A: Device Information
Args:
adapter: Adapter name (e.g., "hci0")
timeout: Scan duration in seconds
name_filter: Only return devices with name containing this string
service_filter: Only return devices advertising this service UUID
Returns:
List of BLE devices with advertisement data
"""
client = await get_client()
# Set BLE-specific filter
uuids = [service_filter] if service_filter else None
await client.set_discovery_filter(
adapter, uuids=uuids, transport="le", duplicate_data=True
)
await client.start_discovery(adapter)
try:
await asyncio.sleep(timeout)
finally:
await client.stop_discovery(adapter)
await client.remove_discovery_filter(adapter)
devices = await client.list_devices(adapter=adapter)
# Apply name filter
if name_filter:
name_filter_lower = name_filter.lower()
devices = [d for d in devices if name_filter_lower in d.name.lower()]
# Format for BLE-specific output
result = []
for d in devices:
# Parse manufacturer data for display
mfr_data_hex = {
k: v.hex() if isinstance(v, bytes) else str(v)
for k, v in d.manufacturer_data.items()
}
svc_data_hex = {
k: v.hex() if isinstance(v, bytes) else str(v)
for k, v in d.service_data.items()
}
result.append({
"address": d.address,
"name": d.name or "(unknown)",
"rssi": d.rssi,
"paired": d.paired,
"connected": d.connected,
"uuids": [_format_uuid(u) for u in d.uuids],
"manufacturer_data": mfr_data_hex,
"service_data": svc_data_hex,
"appearance": d.appearance,
})
return result
@mcp.tool()
async def bt_ble_services(adapter: str, address: str) -> list[dict[str, Any]]:
"""List GATT services for a connected BLE device.
The device must be connected for service discovery.
Services contain characteristics that can be read/written.
Args:
adapter: Adapter name (e.g., "hci0")
address: Device Bluetooth address
Returns:
List of GATT services with their UUIDs
"""
client = await get_client()
# Check device is connected
device = await client.get_device(adapter, address)
if not device:
return [{"error": "Device not found"}]
if not device.connected:
return [{"error": "Device not connected. Use bt_connect first."}]
if not device.services_resolved:
# Wait a bit for services to resolve
await asyncio.sleep(2)
device = await client.get_device(adapter, address)
if not device or not device.services_resolved:
return [{"error": "Services not yet resolved. Try again in a moment."}]
services = await client.list_gatt_services(adapter, address)
return [
{
"uuid": s.uuid,
"uuid_short": _format_uuid(s.uuid),
"primary": s.primary,
"path": s.path,
}
for s in services
]
@mcp.tool()
async def bt_ble_characteristics(
adapter: str,
address: str,
service_uuid: str | None = None,
) -> list[dict[str, Any]]:
"""List GATT characteristics for a BLE device.
Characteristics are the data points within a service that can be
read, written, or subscribed to for notifications.
Args:
adapter: Adapter name (e.g., "hci0")
address: Device Bluetooth address
service_uuid: Filter to characteristics of this service (optional)
Returns:
List of characteristics with their properties
"""
client = await get_client()
device = await client.get_device(adapter, address)
if not device or not device.connected:
return [{"error": "Device not connected"}]
chars = await client.list_gatt_characteristics(adapter, address, service_uuid)
return [
{
"uuid": c.uuid,
"uuid_short": _format_uuid(c.uuid),
"path": c.path,
"flags": c.flags,
"notifying": c.notifying,
"readable": "read" in c.flags,
"writable": "write" in c.flags or "write-without-response" in c.flags,
"notifiable": "notify" in c.flags or "indicate" in c.flags,
}
for c in chars
]
@mcp.tool()
async def bt_ble_read(
adapter: str,
address: str,
char_uuid: str,
) -> dict[str, Any]:
"""Read a GATT characteristic value.
Args:
adapter: Adapter name (e.g., "hci0")
address: Device Bluetooth address
char_uuid: Characteristic UUID to read
Returns:
The characteristic value as hex and decoded (if possible)
"""
client = await get_client()
# Find the characteristic
chars = await client.list_gatt_characteristics(adapter, address)
target_char = None
for c in chars:
if c.uuid.lower() == char_uuid.lower():
target_char = c
break
if not target_char:
return {"error": f"Characteristic {char_uuid} not found"}
if "read" not in target_char.flags:
return {"error": "Characteristic is not readable"}
try:
value = await client.read_characteristic(target_char.path)
result = {
"uuid": char_uuid,
"value_hex": value.hex(),
"value_bytes": list(value),
"length": len(value),
}
# Try to decode as UTF-8 string
try:
result["value_string"] = value.decode("utf-8")
except UnicodeDecodeError:
pass
# Try to decode as integer (common for single-byte values)
if len(value) == 1:
result["value_int"] = value[0]
elif len(value) == 2:
result["value_int"] = int.from_bytes(value, "little")
return result
except Exception as e:
return {"error": str(e)}
@mcp.tool()
async def bt_ble_write(
adapter: str,
address: str,
char_uuid: str,
value: str,
value_type: str = "hex",
with_response: bool = True,
) -> dict[str, Any]:
"""Write a value to a GATT characteristic.
Args:
adapter: Adapter name (e.g., "hci0")
address: Device Bluetooth address
char_uuid: Characteristic UUID to write
value: Value to write
value_type: How to interpret value - "hex" (e.g., "0102ff"),
"string" (UTF-8), or "int" (decimal number)
with_response: Whether to wait for write confirmation
Returns:
Write status
"""
client = await get_client()
# Find the characteristic
chars = await client.list_gatt_characteristics(adapter, address)
target_char = None
for c in chars:
if c.uuid.lower() == char_uuid.lower():
target_char = c
break
if not target_char:
return {"error": f"Characteristic {char_uuid} not found"}
if "write" not in target_char.flags and "write-without-response" not in target_char.flags:
return {"error": "Characteristic is not writable"}
# Convert value to bytes
try:
if value_type == "hex":
data = bytes.fromhex(value)
elif value_type == "string":
data = value.encode("utf-8")
elif value_type == "int":
int_val = int(value)
# Determine byte length needed
byte_len = max(1, (int_val.bit_length() + 7) // 8)
data = int_val.to_bytes(byte_len, "little")
else:
return {"error": f"Unknown value_type: {value_type}"}
except ValueError as e:
return {"error": f"Invalid value format: {e}"}
try:
write_type = "request" if with_response else "command"
await client.write_characteristic(target_char.path, data, write_type)
return {
"status": "written",
"uuid": char_uuid,
"value_hex": data.hex(),
"length": len(data),
}
except Exception as e:
return {"error": str(e)}
@mcp.tool()
async def bt_ble_notify(
adapter: str,
address: str,
char_uuid: str,
enable: bool,
) -> dict[str, Any]:
"""Enable or disable notifications for a characteristic.
When enabled, the device will send updates when the characteristic
value changes (e.g., heart rate measurements, sensor data).
Note: To receive actual notifications, you would need to set up
a callback - this tool just enables/disables the notification mode.
Args:
adapter: Adapter name (e.g., "hci0")
address: Device Bluetooth address
char_uuid: Characteristic UUID
enable: True to enable notifications, False to disable
Returns:
Notification status
"""
client = await get_client()
chars = await client.list_gatt_characteristics(adapter, address)
target_char = None
for c in chars:
if c.uuid.lower() == char_uuid.lower():
target_char = c
break
if not target_char:
return {"error": f"Characteristic {char_uuid} not found"}
if "notify" not in target_char.flags and "indicate" not in target_char.flags:
return {"error": "Characteristic does not support notifications"}
try:
if enable:
await client.start_notify(target_char.path)
else:
await client.stop_notify(target_char.path)
return {
"status": "notifications_enabled" if enable else "notifications_disabled",
"uuid": char_uuid,
}
except Exception as e:
return {"error": str(e)}
@mcp.tool()
async def bt_ble_battery(adapter: str, address: str) -> dict[str, Any]:
"""Read battery level from a BLE device.
Many BLE devices support the standard Battery Service (0x180F).
This is a convenience function that reads from that service.
Args:
adapter: Adapter name (e.g., "hci0")
address: Device Bluetooth address
Returns:
Battery percentage (0-100) or error
"""
client = await get_client()
# First try BlueZ Battery1 interface (cached value)
battery_level = await client.get_battery_level(adapter, address)
if battery_level is not None:
return {"battery_percent": battery_level, "source": "bluez_battery1"}
# Fall back to reading GATT characteristic
device = await client.get_device(adapter, address)
if not device or not device.connected:
return {"error": "Device not connected"}
chars = await client.list_gatt_characteristics(adapter, address, BATTERY_SERVICE_UUID)
battery_char = None
for c in chars:
if c.uuid.lower() == BATTERY_LEVEL_CHAR_UUID.lower():
battery_char = c
break
if not battery_char:
return {"error": "Battery service not available on this device"}
try:
value = await client.read_characteristic(battery_char.path)
if len(value) >= 1:
return {"battery_percent": value[0], "source": "gatt_characteristic"}
return {"error": "Invalid battery value"}
except Exception as e:
return {"error": str(e)}

View File

@ -0,0 +1,314 @@
"""Device discovery and management tools for BlueZ."""
from __future__ import annotations
import asyncio
from dataclasses import asdict
from typing import Any, Literal
from fastmcp import FastMCP
from mcbluetooth.dbus_client import get_client
def register_tools(mcp: FastMCP) -> None:
"""Register device management tools with the MCP server."""
@mcp.tool()
async def bt_scan(
adapter: str,
timeout: int = 10,
mode: Literal["classic", "ble", "both"] = "both",
) -> list[dict[str, Any]]:
"""Scan for nearby Bluetooth devices.
Starts discovery, waits for the specified timeout, then returns
all discovered devices. For BLE devices, ensure they are advertising.
Args:
adapter: Adapter name (e.g., "hci0")
timeout: Scan duration in seconds (default 10)
mode: Scan mode - "classic" (BR/EDR), "ble" (Low Energy), or "both"
Returns:
List of discovered devices with their properties
"""
client = await get_client()
# Set discovery filter based on mode
transport = {"classic": "bredr", "ble": "le", "both": "auto"}[mode]
await client.set_discovery_filter(adapter, transport=transport, duplicate_data=True)
# Start discovery
await client.start_discovery(adapter)
try:
# Wait for scan duration
await asyncio.sleep(timeout)
finally:
# Always stop discovery
await client.stop_discovery(adapter)
await client.remove_discovery_filter(adapter)
# Return discovered devices
devices = await client.list_devices(adapter=adapter)
return [asdict(d) for d in devices]
@mcp.tool()
async def bt_list_devices(
adapter: str,
filter: Literal["all", "paired", "connected", "trusted"] = "all",
) -> list[dict[str, Any]]:
"""List known Bluetooth devices.
Returns devices that have been discovered or paired previously.
Use bt_scan to discover new devices.
Args:
adapter: Adapter name (e.g., "hci0")
filter: Filter type - "all", "paired", "connected", or "trusted"
Returns:
List of devices matching the filter
"""
client = await get_client()
devices = await client.list_devices(adapter=adapter, filter_type=filter)
return [asdict(d) for d in devices]
@mcp.tool()
async def bt_device_info(adapter: str, address: str) -> dict[str, Any] | None:
"""Get detailed information about a specific device.
Args:
adapter: Adapter name (e.g., "hci0")
address: Device Bluetooth address (e.g., "AA:BB:CC:DD:EE:FF")
Returns:
Device properties or None if not found
"""
client = await get_client()
info = await client.get_device(adapter, address)
return asdict(info) if info else None
@mcp.tool()
async def bt_pair(
adapter: str,
address: str,
pairing_mode: Literal["elicit", "interactive", "auto"] = "interactive",
) -> dict[str, Any]:
"""Initiate pairing with a device.
Pairing establishes a trusted relationship with a device. Some devices
require PIN entry or confirmation during pairing.
Args:
adapter: Adapter name (e.g., "hci0")
address: Device Bluetooth address
pairing_mode: How to handle pairing requests:
- "elicit": Use MCP elicitation to request PIN from user (if supported)
- "interactive": Return status, then call bt_pair_confirm with PIN
- "auto": Auto-accept pairings (for trusted environments)
Returns:
Pairing status including whether confirmation is needed
"""
client = await get_client()
# Check if already paired
device = await client.get_device(adapter, address)
if device and device.paired:
return {"status": "already_paired", "device": asdict(device)}
if pairing_mode == "auto":
# Direct pairing without agent - may fail if PIN required
try:
await client.pair_device(adapter, address)
device = await client.get_device(adapter, address)
return {"status": "paired", "device": asdict(device) if device else None}
except Exception as e:
return {"status": "error", "error": str(e)}
else:
# For interactive/elicit modes, we need an agent
# Return status indicating pairing initiated
try:
# Start pairing (this is async and may block for confirmation)
# In a real implementation, this would register an agent
await client.pair_device(adapter, address)
device = await client.get_device(adapter, address)
return {"status": "paired", "device": asdict(device) if device else None}
except Exception as e:
error_msg = str(e)
if "AuthenticationFailed" in error_msg or "AuthenticationCanceled" in error_msg:
return {
"status": "awaiting_confirmation",
"message": "Pairing requires user confirmation or PIN entry",
"pairing_mode": pairing_mode,
}
return {"status": "error", "error": error_msg}
@mcp.tool()
async def bt_pair_confirm(
adapter: str,
address: str,
pin: str | None = None,
accept: bool = True,
) -> dict[str, Any]:
"""Confirm or reject a pairing request.
Use this after bt_pair returns "awaiting_confirmation" status.
Args:
adapter: Adapter name
address: Device Bluetooth address
pin: PIN code if required (usually 4-6 digits)
accept: True to accept pairing, False to reject
Returns:
Pairing result
"""
client = await get_client()
if not accept:
try:
await client.cancel_pairing(adapter, address)
return {"status": "pairing_cancelled"}
except Exception as e:
return {"status": "error", "error": str(e)}
# Re-attempt pairing - agent would handle PIN
# For now, this is a simplified implementation
try:
await client.pair_device(adapter, address)
device = await client.get_device(adapter, address)
return {"status": "paired", "device": asdict(device) if device else None}
except Exception as e:
return {"status": "error", "error": str(e)}
@mcp.tool()
async def bt_unpair(adapter: str, address: str) -> dict[str, str]:
"""Remove pairing with a device.
This removes the device from the list of known devices and
deletes all pairing information.
Args:
adapter: Adapter name (e.g., "hci0")
address: Device Bluetooth address
Returns:
Status of the operation
"""
client = await get_client()
try:
await client.remove_device(adapter, address)
return {"status": "removed", "address": address}
except Exception as e:
return {"status": "error", "error": str(e)}
@mcp.tool()
async def bt_connect(adapter: str, address: str) -> dict[str, Any]:
"""Connect to a paired device.
Establishes an active connection to a previously paired device.
For audio devices, this will also connect audio profiles.
Args:
adapter: Adapter name (e.g., "hci0")
address: Device Bluetooth address
Returns:
Connection status and device info
"""
client = await get_client()
try:
await client.connect_device(adapter, address)
device = await client.get_device(adapter, address)
return {"status": "connected", "device": asdict(device) if device else None}
except Exception as e:
return {"status": "error", "error": str(e)}
@mcp.tool()
async def bt_disconnect(adapter: str, address: str) -> dict[str, Any]:
"""Disconnect from a device.
Terminates the active connection but preserves pairing.
Args:
adapter: Adapter name (e.g., "hci0")
address: Device Bluetooth address
Returns:
Status of the operation
"""
client = await get_client()
try:
await client.disconnect_device(adapter, address)
device = await client.get_device(adapter, address)
return {"status": "disconnected", "device": asdict(device) if device else None}
except Exception as e:
return {"status": "error", "error": str(e)}
@mcp.tool()
async def bt_trust(adapter: str, address: str, trusted: bool) -> dict[str, Any]:
"""Set device trust status.
Trusted devices can connect automatically without explicit
authorization.
Args:
adapter: Adapter name (e.g., "hci0")
address: Device Bluetooth address
trusted: True to trust, False to untrust
Returns:
Updated device info
"""
client = await get_client()
try:
await client.set_device_trusted(adapter, address, trusted)
device = await client.get_device(adapter, address)
return {"status": "updated", "device": asdict(device) if device else None}
except Exception as e:
return {"status": "error", "error": str(e)}
@mcp.tool()
async def bt_block(adapter: str, address: str, blocked: bool) -> dict[str, Any]:
"""Block or unblock a device.
Blocked devices cannot connect to this adapter.
Args:
adapter: Adapter name (e.g., "hci0")
address: Device Bluetooth address
blocked: True to block, False to unblock
Returns:
Updated device info
"""
client = await get_client()
try:
await client.set_device_blocked(adapter, address, blocked)
device = await client.get_device(adapter, address)
return {"status": "updated", "device": asdict(device) if device else None}
except Exception as e:
return {"status": "error", "error": str(e)}
@mcp.tool()
async def bt_device_set_alias(adapter: str, address: str, alias: str) -> dict[str, Any]:
"""Set a friendly name for a device.
Args:
adapter: Adapter name (e.g., "hci0")
address: Device Bluetooth address
alias: New friendly name
Returns:
Updated device info
"""
client = await get_client()
try:
await client.set_device_alias(adapter, address, alias)
device = await client.get_device(adapter, address)
return {"status": "updated", "device": asdict(device) if device else None}
except Exception as e:
return {"status": "error", "error": str(e)}

1
tests/__init__.py Normal file
View File

@ -0,0 +1 @@
"""Tests for mcbluetooth."""

93
tests/test_dbus_client.py Normal file
View File

@ -0,0 +1,93 @@
"""Tests for the BlueZ D-Bus client."""
import pytest
from unittest.mock import AsyncMock, MagicMock, patch
from mcbluetooth.dbus_client import (
BlueZClient,
AdapterInfo,
DeviceInfo,
address_to_path,
path_to_address,
)
class TestPathConversion:
"""Test address/path conversion utilities."""
def test_address_to_path(self):
assert address_to_path("hci0", "AA:BB:CC:DD:EE:FF") == "/org/bluez/hci0/dev_AA_BB_CC_DD_EE_FF"
assert address_to_path("hci1", "11:22:33:44:55:66") == "/org/bluez/hci1/dev_11_22_33_44_55_66"
def test_address_to_path_lowercase(self):
# Should uppercase the address
assert address_to_path("hci0", "aa:bb:cc:dd:ee:ff") == "/org/bluez/hci0/dev_AA_BB_CC_DD_EE_FF"
def test_path_to_address(self):
assert path_to_address("/org/bluez/hci0/dev_AA_BB_CC_DD_EE_FF") == "AA:BB:CC:DD:EE:FF"
assert path_to_address("/org/bluez/hci1/dev_11_22_33_44_55_66") == "11:22:33:44:55:66"
def test_path_to_address_invalid(self):
assert path_to_address("/org/bluez/hci0") == ""
assert path_to_address("/invalid/path") == ""
class TestAdapterInfo:
"""Test AdapterInfo dataclass."""
def test_adapter_info_creation(self):
info = AdapterInfo(
path="/org/bluez/hci0",
name="hci0",
address="AA:BB:CC:DD:EE:FF",
alias="My Adapter",
powered=True,
discoverable=False,
discoverable_timeout=180,
pairable=True,
pairable_timeout=0,
discovering=False,
)
assert info.name == "hci0"
assert info.powered is True
assert info.discoverable is False
class TestDeviceInfo:
"""Test DeviceInfo dataclass."""
def test_device_info_creation(self):
info = DeviceInfo(
path="/org/bluez/hci0/dev_AA_BB_CC_DD_EE_FF",
adapter="hci0",
address="AA:BB:CC:DD:EE:FF",
name="Test Device",
alias="My Device",
paired=True,
bonded=True,
trusted=True,
blocked=False,
connected=True,
rssi=-50,
tx_power=4,
)
assert info.address == "AA:BB:CC:DD:EE:FF"
assert info.paired is True
assert info.connected is True
assert info.rssi == -50
class TestBlueZClient:
"""Test BlueZClient methods with mocked D-Bus."""
@pytest.fixture
def client(self):
return BlueZClient()
@pytest.mark.asyncio
async def test_client_not_connected_initially(self, client):
assert client._bus is None
assert client._object_manager is None
# Additional tests would require more extensive mocking of dbus-fast
# which is better done as integration tests with a real BlueZ stack