commit 013cd0eb2f45454f5e3456a12c731d15e24edad8 Author: Ryan Malloy Date: Mon Feb 2 02:03:49 2026 -0700 Initial implementation of mcbluetooth MCP server Comprehensive BlueZ MCP server exposing Linux Bluetooth stack to LLMs: - Adapter management (list, power, discoverable, pairable) - Device discovery and management (scan, pair, connect, trust) - Audio profiles with PipeWire/PulseAudio integration (A2DP/HFP switching) - BLE/GATT support (services, characteristics, read/write/notify) Built on: - FastMCP 2.14.4 for MCP protocol - dbus-fast 4.0.0 for async BlueZ D-Bus communication - pulsectl-asyncio 1.2.2 for audio control diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..68ac800 --- /dev/null +++ b/.gitignore @@ -0,0 +1,46 @@ +# Python +__pycache__/ +*.py[cod] +*$py.class +*.so +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +*.egg-info/ +.installed.cfg +*.egg + +# Virtual environments +.venv/ +venv/ +ENV/ + +# IDE +.idea/ +.vscode/ +*.swp +*.swo + +# Testing +.pytest_cache/ +.coverage +htmlcov/ +.tox/ +.nox/ + +# uv +.uv/ +uv.lock + +# Project specific +*.log diff --git a/README.md b/README.md new file mode 100644 index 0000000..ea42847 --- /dev/null +++ b/README.md @@ -0,0 +1,154 @@ +# mcbluetooth + +A comprehensive MCP server exposing the Linux Bluetooth stack (BlueZ) to LLMs. + +## Features + +### Adapter Management +- List, power, and configure Bluetooth adapters +- Control discoverable and pairable states +- Set adapter aliases + +### Device Management +- Classic Bluetooth and BLE scanning with filters +- Pairing with multi-modal support (elicit, interactive, auto) +- Connect/disconnect/trust/block devices +- View device properties including RSSI, UUIDs, manufacturer data + +### Audio Profiles (A2DP/HFP) +- List audio devices (sinks, sources, cards) +- Connect/disconnect audio profiles +- Switch between A2DP (high quality) and HFP (calls with mic) +- Volume control and muting +- Set default audio device (PipeWire/PulseAudio integration) + +### Bluetooth Low Energy (BLE) +- BLE-specific scanning with name/service filters +- GATT service discovery +- Read/write characteristics +- Enable/disable notifications +- Battery level reading (standard Battery Service) + +## Installation + +```bash +# Install with uv (recommended) +uvx mcbluetooth + +# Or install from source +uv pip install -e . +``` + +## Usage with Claude Code + +```bash +# Add to Claude Code (from source) +claude mcp add mcbluetooth-local -- uv run --directory /path/to/mcbluetooth mcbluetooth + +# Or if published to PyPI +claude mcp add mcbluetooth -- uvx mcbluetooth +``` + +## Requirements + +- Linux with BlueZ 5.x +- Python 3.11+ +- PipeWire or PulseAudio (for audio features) +- User must be in `bluetooth` group or have polkit permissions + +### Permissions + +```bash +# Add user to bluetooth group +sudo usermod -aG bluetooth $USER + +# Or configure polkit for BlueZ D-Bus access +``` + +## MCP Tools + +### Adapter Tools +| Tool | Description | +|------|-------------| +| `bt_list_adapters` | List all Bluetooth adapters | +| `bt_adapter_info` | Get adapter details | +| `bt_adapter_power` | Power on/off | +| `bt_adapter_discoverable` | Set visible to other devices | +| `bt_adapter_pairable` | Enable/disable pairing acceptance | +| `bt_adapter_set_alias` | Set friendly name | + +### Device Tools +| Tool | Description | +|------|-------------| +| `bt_scan` | Scan for devices (classic/BLE/both) | +| `bt_list_devices` | List known devices with filters | +| `bt_device_info` | Get device details | +| `bt_pair` | Initiate pairing | +| `bt_pair_confirm` | Confirm/reject pairing | +| `bt_unpair` | Remove device | +| `bt_connect` | Connect to paired device | +| `bt_disconnect` | Disconnect device | +| `bt_trust` | Trust/untrust device | +| `bt_block` | Block/unblock device | + +### Audio Tools +| Tool | Description | +|------|-------------| +| `bt_audio_list` | List all audio devices | +| `bt_audio_connect` | Connect audio profiles | +| `bt_audio_disconnect` | Disconnect audio | +| `bt_audio_set_profile` | Switch A2DP/HFP/off | +| `bt_audio_set_default` | Set as default sink | +| `bt_audio_volume` | Set volume (0-150) | +| `bt_audio_mute` | Mute/unmute | + +### BLE Tools +| Tool | Description | +|------|-------------| +| `bt_ble_scan` | BLE scan with filters | +| `bt_ble_services` | List GATT services | +| `bt_ble_characteristics` | List characteristics | +| `bt_ble_read` | Read characteristic value | +| `bt_ble_write` | Write characteristic value | +| `bt_ble_notify` | Enable/disable notifications | +| `bt_ble_battery` | Read battery level | + +## Example Prompts + +``` +# Discover and connect headphones +"Scan for Bluetooth devices and connect to my Sony headphones" + +# Switch audio profile for calls +"Switch my headphones to HFP mode for a phone call" + +# Read from a fitness tracker +"Connect to my fitness band and read the battery level" + +# Set up audio output +"List all Bluetooth audio devices and set my speaker as the default" +``` + +## Architecture + +``` +┌─────────────────────────────────────────────┐ +│ FastMCP Server │ +├─────────────────────────────────────────────┤ +│ Tool Categories │ +│ ┌─────────┬─────────┬─────────┬─────────┐ │ +│ │ Adapter │ Device │ Audio │ BLE │ │ +│ │ Tools │ Tools │ Tools │ Tools │ │ +│ └─────────┴─────────┴─────────┴─────────┘ │ +├─────────────────────────────────────────────┤ +│ BlueZ D-Bus Client Layer │ +│ (dbus-fast) │ +├─────────────────────────────────────────────┤ +│ PipeWire/PulseAudio Integration │ +│ (pulsectl-asyncio) │ +└─────────────────────────────────────────────┘ +``` + +## License + +MIT diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..370b8fa --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,61 @@ +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[project] +name = "mcbluetooth" +version = "2026.02.02" +description = "Comprehensive BlueZ MCP server - expose the full Linux Bluetooth stack to LLMs" +readme = "README.md" +requires-python = ">=3.11" +license = "MIT" +authors = [ + {name = "Ryan Malloy", email = "ryan@supported.systems"} +] +keywords = ["bluetooth", "bluez", "mcp", "fastmcp", "dbus", "ble", "gatt"] +classifiers = [ + "Development Status :: 4 - Beta", + "Environment :: Console", + "Intended Audience :: Developers", + "License :: OSI Approved :: MIT License", + "Operating System :: POSIX :: Linux", + "Programming Language :: Python :: 3.11", + "Programming Language :: Python :: 3.12", + "Programming Language :: Python :: 3.13", + "Topic :: System :: Hardware", + "Topic :: Communications", +] + +dependencies = [ + "fastmcp>=2.14.4", + "dbus-fast>=4.0.0", + "pulsectl-asyncio>=1.2.2", +] + +[project.optional-dependencies] +dev = [ + "pytest>=8.0", + "pytest-asyncio>=0.24", + "ruff>=0.8", +] + +[project.scripts] +mcbluetooth = "mcbluetooth:main" + +[project.urls] +Repository = "https://github.com/ryanmalloy/mcbluetooth" + +[tool.hatch.build.targets.wheel] +packages = ["src/mcbluetooth"] + +[tool.ruff] +line-length = 100 +target-version = "py311" + +[tool.ruff.lint] +select = ["E", "F", "W", "I", "B", "UP"] +ignore = ["E501"] + +[tool.pytest.ini_options] +asyncio_mode = "auto" +testpaths = ["tests"] diff --git a/src/mcbluetooth/__init__.py b/src/mcbluetooth/__init__.py new file mode 100644 index 0000000..118bab4 --- /dev/null +++ b/src/mcbluetooth/__init__.py @@ -0,0 +1,5 @@ +"""mcbluetooth - Comprehensive BlueZ MCP server for Linux Bluetooth management.""" + +from mcbluetooth.server import main, mcp + +__all__ = ["mcp", "main"] diff --git a/src/mcbluetooth/audio.py b/src/mcbluetooth/audio.py new file mode 100644 index 0000000..e939290 --- /dev/null +++ b/src/mcbluetooth/audio.py @@ -0,0 +1,300 @@ +"""PipeWire/PulseAudio integration for Bluetooth audio control.""" + +from __future__ import annotations + +from dataclasses import dataclass, field + +import pulsectl_asyncio + + +@dataclass +class AudioSink: + """Information about an audio sink (output device).""" + + name: str + index: int + description: str + muted: bool + volume_percent: int + is_bluetooth: bool + bluetooth_address: str | None + state: str # "running", "idle", "suspended" + properties: dict[str, str] = field(default_factory=dict) + + +@dataclass +class AudioSource: + """Information about an audio source (input device).""" + + name: str + index: int + description: str + muted: bool + volume_percent: int + is_bluetooth: bool + bluetooth_address: str | None + state: str + properties: dict[str, str] = field(default_factory=dict) + + +@dataclass +class AudioCard: + """Information about an audio card (device with profiles).""" + + name: str + index: int + driver: str + active_profile: str + profiles: list[str] + is_bluetooth: bool + bluetooth_address: str | None + properties: dict[str, str] = field(default_factory=dict) + + +def _extract_bt_address(props: dict[str, str]) -> str | None: + """Extract Bluetooth address from PulseAudio properties.""" + # Try various property names used by different versions + for key in ["device.string", "api.bluez5.address", "bluez.path"]: + if key in props: + value = props[key] + # Check if it looks like a BT address + if len(value) == 17 and value.count(":") == 5: + return value.upper() + # Extract from path like /org/bluez/hci0/dev_XX_XX_XX_XX_XX_XX + if "dev_" in value: + parts = value.split("dev_") + if len(parts) > 1: + addr = parts[1].split("/")[0].replace("_", ":") + if len(addr) == 17: + return addr.upper() + return None + + +def _is_bluetooth(props: dict[str, str]) -> bool: + """Check if device is Bluetooth based on properties.""" + # Check various indicators + if "api.bluez5.address" in props: + return True + if "bluez" in props.get("device.api", "").lower(): + return True + if "bluetooth" in props.get("device.bus", "").lower(): + return True + if props.get("device.string", "").startswith("/org/bluez"): + return True + return False + + +class PulseAudioClient: + """Async client for PulseAudio/PipeWire audio control.""" + + def __init__(self): + self._pulse: pulsectl_asyncio.PulseAsync | None = None + + async def connect(self) -> None: + """Connect to PulseAudio/PipeWire server.""" + if self._pulse is not None: + return + self._pulse = pulsectl_asyncio.PulseAsync("mcbluetooth") + await self._pulse.connect() + + async def disconnect(self) -> None: + """Disconnect from audio server.""" + if self._pulse: + self._pulse.close() + self._pulse = None + + async def _ensure_connected(self) -> None: + """Ensure we're connected.""" + if self._pulse is None: + await self.connect() + + async def list_sinks(self) -> list[AudioSink]: + """List all audio sinks (output devices).""" + await self._ensure_connected() + assert self._pulse is not None + + sinks = [] + for sink in await self._pulse.sink_list(): + props = dict(sink.proplist) + is_bt = _is_bluetooth(props) + bt_addr = _extract_bt_address(props) if is_bt else None + + # Calculate average volume percentage + vol_pct = int(sink.volume.value_flat * 100) if sink.volume else 0 + + sinks.append( + AudioSink( + name=sink.name, + index=sink.index, + description=sink.description or sink.name, + muted=sink.mute == 1, + volume_percent=vol_pct, + is_bluetooth=is_bt, + bluetooth_address=bt_addr, + state=sink.state.name.lower() if hasattr(sink.state, "name") else str(sink.state), + properties=props, + ) + ) + return sinks + + async def list_sources(self) -> list[AudioSource]: + """List all audio sources (input devices).""" + await self._ensure_connected() + assert self._pulse is not None + + sources = [] + for source in await self._pulse.source_list(): + # Skip monitor sources + if ".monitor" in source.name: + continue + + props = dict(source.proplist) + is_bt = _is_bluetooth(props) + bt_addr = _extract_bt_address(props) if is_bt else None + + vol_pct = int(source.volume.value_flat * 100) if source.volume else 0 + + sources.append( + AudioSource( + name=source.name, + index=source.index, + description=source.description or source.name, + muted=source.mute == 1, + volume_percent=vol_pct, + is_bluetooth=is_bt, + bluetooth_address=bt_addr, + state=source.state.name.lower() if hasattr(source.state, "name") else str(source.state), + properties=props, + ) + ) + return sources + + async def list_cards(self) -> list[AudioCard]: + """List all audio cards (devices with profiles).""" + await self._ensure_connected() + assert self._pulse is not None + + cards = [] + for card in await self._pulse.card_list(): + props = dict(card.proplist) + is_bt = _is_bluetooth(props) + bt_addr = _extract_bt_address(props) if is_bt else None + + profile_names = [p.name for p in card.profile_list] + active_profile = card.profile_active.name if card.profile_active else "" + + cards.append( + AudioCard( + name=card.name, + index=card.index, + driver=card.driver, + active_profile=active_profile, + profiles=profile_names, + is_bluetooth=is_bt, + bluetooth_address=bt_addr, + properties=props, + ) + ) + return cards + + async def get_default_sink(self) -> str | None: + """Get the name of the default sink.""" + await self._ensure_connected() + assert self._pulse is not None + + info = await self._pulse.server_info() + return info.default_sink_name + + async def get_default_source(self) -> str | None: + """Get the name of the default source.""" + await self._ensure_connected() + assert self._pulse is not None + + info = await self._pulse.server_info() + return info.default_source_name + + async def set_default_sink(self, sink_name: str) -> None: + """Set the default audio sink.""" + await self._ensure_connected() + assert self._pulse is not None + + await self._pulse.sink_default_set(sink_name) + + async def set_default_source(self, source_name: str) -> None: + """Set the default audio source.""" + await self._ensure_connected() + assert self._pulse is not None + + await self._pulse.source_default_set(source_name) + + async def set_sink_volume(self, sink_name: str, volume_percent: int) -> None: + """Set sink volume (0-100).""" + await self._ensure_connected() + assert self._pulse is not None + + volume_percent = max(0, min(150, volume_percent)) # Allow up to 150% + vol = volume_percent / 100.0 + + # Find the sink + for sink in await self._pulse.sink_list(): + if sink.name == sink_name: + await self._pulse.volume_set_all_chans(sink, vol) + return + raise ValueError(f"Sink not found: {sink_name}") + + async def set_sink_mute(self, sink_name: str, muted: bool) -> None: + """Mute or unmute a sink.""" + await self._ensure_connected() + assert self._pulse is not None + + for sink in await self._pulse.sink_list(): + if sink.name == sink_name: + await self._pulse.sink_mute(sink.index, muted) + return + raise ValueError(f"Sink not found: {sink_name}") + + async def set_card_profile(self, card_name: str, profile_name: str) -> None: + """Set the active profile for a card. + + For Bluetooth devices, common profiles include: + - a2dp-sink: High quality audio output (A2DP) + - headset-head-unit: Lower quality but with microphone (HFP/HSP) + - off: Disable the device + """ + await self._ensure_connected() + assert self._pulse is not None + + for card in await self._pulse.card_list(): + if card.name == card_name: + await self._pulse.card_profile_set(card, profile_name) + return + raise ValueError(f"Card not found: {card_name}") + + async def find_bluetooth_sink(self, address: str) -> AudioSink | None: + """Find a sink by Bluetooth address.""" + address = address.upper() + for sink in await self.list_sinks(): + if sink.bluetooth_address == address: + return sink + return None + + async def find_bluetooth_card(self, address: str) -> AudioCard | None: + """Find a card by Bluetooth address.""" + address = address.upper() + for card in await self.list_cards(): + if card.bluetooth_address == address: + return card + return None + + +# Global client instance +_audio_client: PulseAudioClient | None = None + + +async def get_audio_client() -> PulseAudioClient: + """Get or create the global audio client.""" + global _audio_client + if _audio_client is None: + _audio_client = PulseAudioClient() + await _audio_client.connect() + return _audio_client diff --git a/src/mcbluetooth/dbus_client.py b/src/mcbluetooth/dbus_client.py new file mode 100644 index 0000000..0ab1836 --- /dev/null +++ b/src/mcbluetooth/dbus_client.py @@ -0,0 +1,589 @@ +"""BlueZ D-Bus client wrapper using dbus-fast. + +This module provides an async interface to the BlueZ Bluetooth stack via D-Bus. +BlueZ exposes its functionality through several D-Bus interfaces: + +- org.bluez.Adapter1: Adapter control (power, discoverable, scanning) +- org.bluez.Device1: Device management (pair, connect, trust) +- org.bluez.GattService1: BLE GATT services +- org.bluez.GattCharacteristic1: BLE GATT characteristics +- org.bluez.Agent1: Pairing agent for PIN/confirmation handling +- org.bluez.AgentManager1: Agent registration + +Object paths follow this pattern: +- /org/bluez/hci0 - Adapter +- /org/bluez/hci0/dev_XX_XX_XX_XX_XX_XX - Device +- /org/bluez/hci0/dev_XX_XX_XX_XX_XX_XX/serviceXXXX - GATT service +""" + +from __future__ import annotations + +from dataclasses import dataclass, field +from typing import Any + +from dbus_fast import BusType, Variant +from dbus_fast.aio import MessageBus, ProxyInterface + +# BlueZ constants +BLUEZ_SERVICE = "org.bluez" +BLUEZ_ADAPTER_IFACE = "org.bluez.Adapter1" +BLUEZ_DEVICE_IFACE = "org.bluez.Device1" +BLUEZ_GATT_SERVICE_IFACE = "org.bluez.GattService1" +BLUEZ_GATT_CHAR_IFACE = "org.bluez.GattCharacteristic1" +BLUEZ_GATT_DESC_IFACE = "org.bluez.GattDescriptor1" +BLUEZ_BATTERY_IFACE = "org.bluez.Battery1" +BLUEZ_AGENT_IFACE = "org.bluez.Agent1" +BLUEZ_AGENT_MANAGER_IFACE = "org.bluez.AgentManager1" + +DBUS_PROPS_IFACE = "org.freedesktop.DBus.Properties" +DBUS_OBJMANAGER_IFACE = "org.freedesktop.DBus.ObjectManager" + + +def address_to_path(adapter: str, address: str) -> str: + """Convert a Bluetooth address to D-Bus object path.""" + addr_path = address.upper().replace(":", "_") + return f"/org/bluez/{adapter}/dev_{addr_path}" + + +def path_to_address(path: str) -> str: + """Extract Bluetooth address from D-Bus object path.""" + # /org/bluez/hci0/dev_XX_XX_XX_XX_XX_XX -> XX:XX:XX:XX:XX:XX + parts = path.split("/") + if len(parts) >= 5 and parts[-1].startswith("dev_"): + return parts[-1][4:].replace("_", ":") + return "" + + +@dataclass +class AdapterInfo: + """Information about a Bluetooth adapter.""" + + path: str + name: str # e.g., "hci0" + address: str + alias: str + powered: bool + discoverable: bool + discoverable_timeout: int + pairable: bool + pairable_timeout: int + discovering: bool + uuids: list[str] = field(default_factory=list) + modalias: str = "" + device_class: int = 0 + + +@dataclass +class DeviceInfo: + """Information about a Bluetooth device.""" + + path: str + adapter: str + address: str + name: str + alias: str + paired: bool + bonded: bool + trusted: bool + blocked: bool + connected: bool + rssi: int | None + tx_power: int | None + uuids: list[str] = field(default_factory=list) + device_class: int = 0 + appearance: int = 0 + icon: str = "" + services_resolved: bool = False + manufacturer_data: dict[int, bytes] = field(default_factory=dict) + service_data: dict[str, bytes] = field(default_factory=dict) + + +@dataclass +class GattService: + """Information about a GATT service.""" + + path: str + uuid: str + primary: bool + device_path: str + + +@dataclass +class GattCharacteristic: + """Information about a GATT characteristic.""" + + path: str + uuid: str + service_path: str + flags: list[str] = field(default_factory=list) + notifying: bool = False + + +class BlueZClient: + """Async client for BlueZ D-Bus API.""" + + def __init__(self): + self._bus: MessageBus | None = None + self._object_manager: ProxyInterface | None = None + self._objects_cache: dict[str, dict[str, dict[str, Any]]] = {} + + async def connect(self) -> None: + """Connect to the system D-Bus.""" + if self._bus is not None: + return + + self._bus = await MessageBus(bus_type=BusType.SYSTEM).connect() + + # Get the ObjectManager for BlueZ + introspection = await self._bus.introspect(BLUEZ_SERVICE, "/") + proxy = self._bus.get_proxy_object(BLUEZ_SERVICE, "/", introspection) + self._object_manager = proxy.get_interface(DBUS_OBJMANAGER_IFACE) + + async def disconnect(self) -> None: + """Disconnect from D-Bus.""" + if self._bus: + self._bus.disconnect() + self._bus = None + self._object_manager = None + + async def _ensure_connected(self) -> None: + """Ensure we're connected to D-Bus.""" + if self._bus is None: + await self.connect() + + async def _get_managed_objects(self) -> dict[str, dict[str, dict[str, Any]]]: + """Get all managed objects from BlueZ.""" + await self._ensure_connected() + assert self._object_manager is not None + + objects = await self._object_manager.call_get_managed_objects() + # Convert Variant values to Python types + result = {} + for path, interfaces in objects.items(): + result[path] = {} + for iface, props in interfaces.items(): + result[path][iface] = { + k: v.value if isinstance(v, Variant) else v for k, v in props.items() + } + return result + + async def _get_interface(self, path: str, interface: str) -> ProxyInterface: + """Get a proxy interface for an object.""" + await self._ensure_connected() + assert self._bus is not None + + introspection = await self._bus.introspect(BLUEZ_SERVICE, path) + proxy = self._bus.get_proxy_object(BLUEZ_SERVICE, path, introspection) + return proxy.get_interface(interface) + + async def _get_property(self, path: str, interface: str, prop: str) -> Any: + """Get a single property from an object.""" + props_iface = await self._get_interface(path, DBUS_PROPS_IFACE) + result = await props_iface.call_get(interface, prop) + return result.value if isinstance(result, Variant) else result + + async def _set_property(self, path: str, interface: str, prop: str, value: Any) -> None: + """Set a single property on an object.""" + props_iface = await self._get_interface(path, DBUS_PROPS_IFACE) + # Wrap value in appropriate Variant + if isinstance(value, bool): + variant = Variant("b", value) + elif isinstance(value, int): + variant = Variant("u", value) + elif isinstance(value, str): + variant = Variant("s", value) + else: + variant = Variant("v", value) + await props_iface.call_set(interface, prop, variant) + + async def _get_all_properties(self, path: str, interface: str) -> dict[str, Any]: + """Get all properties from an object.""" + props_iface = await self._get_interface(path, DBUS_PROPS_IFACE) + props = await props_iface.call_get_all(interface) + return {k: v.value if isinstance(v, Variant) else v for k, v in props.items()} + + # ==================== Adapter Operations ==================== + + async def list_adapters(self) -> list[AdapterInfo]: + """List all Bluetooth adapters.""" + objects = await self._get_managed_objects() + adapters = [] + + for path, interfaces in objects.items(): + if BLUEZ_ADAPTER_IFACE in interfaces: + props = interfaces[BLUEZ_ADAPTER_IFACE] + # Extract adapter name from path: /org/bluez/hci0 -> hci0 + name = path.split("/")[-1] + adapters.append( + AdapterInfo( + path=path, + name=name, + address=props.get("Address", ""), + alias=props.get("Alias", ""), + powered=props.get("Powered", False), + discoverable=props.get("Discoverable", False), + discoverable_timeout=props.get("DiscoverableTimeout", 0), + pairable=props.get("Pairable", False), + pairable_timeout=props.get("PairableTimeout", 0), + discovering=props.get("Discovering", False), + uuids=props.get("UUIDs", []), + modalias=props.get("Modalias", ""), + device_class=props.get("Class", 0), + ) + ) + + return adapters + + async def get_adapter(self, adapter: str) -> AdapterInfo | None: + """Get info for a specific adapter.""" + adapters = await self.list_adapters() + for a in adapters: + if a.name == adapter: + return a + return None + + async def set_adapter_power(self, adapter: str, powered: bool) -> None: + """Set adapter power state.""" + path = f"/org/bluez/{adapter}" + await self._set_property(path, BLUEZ_ADAPTER_IFACE, "Powered", powered) + + async def set_adapter_discoverable( + self, adapter: str, discoverable: bool, timeout: int = 0 + ) -> None: + """Set adapter discoverable state.""" + path = f"/org/bluez/{adapter}" + if timeout > 0: + await self._set_property( + path, BLUEZ_ADAPTER_IFACE, "DiscoverableTimeout", timeout + ) + await self._set_property(path, BLUEZ_ADAPTER_IFACE, "Discoverable", discoverable) + + async def set_adapter_pairable(self, adapter: str, pairable: bool, timeout: int = 0) -> None: + """Set adapter pairable state.""" + path = f"/org/bluez/{adapter}" + if timeout > 0: + await self._set_property(path, BLUEZ_ADAPTER_IFACE, "PairableTimeout", timeout) + await self._set_property(path, BLUEZ_ADAPTER_IFACE, "Pairable", pairable) + + async def set_adapter_alias(self, adapter: str, alias: str) -> None: + """Set adapter friendly name.""" + path = f"/org/bluez/{adapter}" + await self._set_property(path, BLUEZ_ADAPTER_IFACE, "Alias", alias) + + # ==================== Discovery Operations ==================== + + async def start_discovery(self, adapter: str) -> None: + """Start device discovery on an adapter.""" + path = f"/org/bluez/{adapter}" + iface = await self._get_interface(path, BLUEZ_ADAPTER_IFACE) + await iface.call_start_discovery() + + async def stop_discovery(self, adapter: str) -> None: + """Stop device discovery on an adapter.""" + path = f"/org/bluez/{adapter}" + iface = await self._get_interface(path, BLUEZ_ADAPTER_IFACE) + await iface.call_stop_discovery() + + async def set_discovery_filter( + self, + adapter: str, + uuids: list[str] | None = None, + rssi: int | None = None, + transport: str = "auto", + duplicate_data: bool = False, + ) -> None: + """Set discovery filter for BLE scanning. + + Args: + adapter: Adapter name (e.g., "hci0") + uuids: List of service UUIDs to filter by + rssi: Minimum RSSI threshold + transport: "auto", "bredr", or "le" + duplicate_data: Report duplicate advertisements + """ + path = f"/org/bluez/{adapter}" + iface = await self._get_interface(path, BLUEZ_ADAPTER_IFACE) + + filter_dict: dict[str, Variant] = {} + if uuids: + filter_dict["UUIDs"] = Variant("as", uuids) + if rssi is not None: + filter_dict["RSSI"] = Variant("n", rssi) + filter_dict["Transport"] = Variant("s", transport) + filter_dict["DuplicateData"] = Variant("b", duplicate_data) + + await iface.call_set_discovery_filter(filter_dict) + + async def remove_discovery_filter(self, adapter: str) -> None: + """Remove discovery filter.""" + path = f"/org/bluez/{adapter}" + iface = await self._get_interface(path, BLUEZ_ADAPTER_IFACE) + # Set empty filter to clear + await iface.call_set_discovery_filter({}) + + # ==================== Device Operations ==================== + + async def list_devices( + self, + adapter: str | None = None, + filter_type: str = "all", + ) -> list[DeviceInfo]: + """List devices, optionally filtered. + + Args: + adapter: Filter to devices on this adapter, or None for all + filter_type: "all", "paired", "connected", or "trusted" + """ + objects = await self._get_managed_objects() + devices = [] + + for path, interfaces in objects.items(): + if BLUEZ_DEVICE_IFACE not in interfaces: + continue + + # Filter by adapter + if adapter and not path.startswith(f"/org/bluez/{adapter}/"): + continue + + props = interfaces[BLUEZ_DEVICE_IFACE] + + # Apply filter + if filter_type == "paired" and not props.get("Paired", False): + continue + if filter_type == "connected" and not props.get("Connected", False): + continue + if filter_type == "trusted" and not props.get("Trusted", False): + continue + + # Extract adapter name from path + adapter_name = path.split("/")[3] if len(path.split("/")) > 3 else "" + + # Parse manufacturer data + mfr_data = {} + if "ManufacturerData" in props: + for k, v in props["ManufacturerData"].items(): + mfr_data[k] = bytes(v) if isinstance(v, (list, bytearray)) else v + + # Parse service data + svc_data = {} + if "ServiceData" in props: + for k, v in props["ServiceData"].items(): + svc_data[k] = bytes(v) if isinstance(v, (list, bytearray)) else v + + devices.append( + DeviceInfo( + path=path, + adapter=adapter_name, + address=props.get("Address", ""), + name=props.get("Name", ""), + alias=props.get("Alias", ""), + paired=props.get("Paired", False), + bonded=props.get("Bonded", False), + trusted=props.get("Trusted", False), + blocked=props.get("Blocked", False), + connected=props.get("Connected", False), + rssi=props.get("RSSI"), + tx_power=props.get("TxPower"), + uuids=props.get("UUIDs", []), + device_class=props.get("Class", 0), + appearance=props.get("Appearance", 0), + icon=props.get("Icon", ""), + services_resolved=props.get("ServicesResolved", False), + manufacturer_data=mfr_data, + service_data=svc_data, + ) + ) + + return devices + + async def get_device(self, adapter: str, address: str) -> DeviceInfo | None: + """Get info for a specific device.""" + devices = await self.list_devices(adapter=adapter) + for d in devices: + if d.address.upper() == address.upper(): + return d + return None + + async def pair_device(self, adapter: str, address: str) -> None: + """Initiate pairing with a device. + + Note: This may require an agent for PIN entry or confirmation. + """ + path = address_to_path(adapter, address) + iface = await self._get_interface(path, BLUEZ_DEVICE_IFACE) + await iface.call_pair() + + async def cancel_pairing(self, adapter: str, address: str) -> None: + """Cancel ongoing pairing.""" + path = address_to_path(adapter, address) + iface = await self._get_interface(path, BLUEZ_DEVICE_IFACE) + await iface.call_cancel_pairing() + + async def connect_device(self, adapter: str, address: str) -> None: + """Connect to a device.""" + path = address_to_path(adapter, address) + iface = await self._get_interface(path, BLUEZ_DEVICE_IFACE) + await iface.call_connect() + + async def disconnect_device(self, adapter: str, address: str) -> None: + """Disconnect from a device.""" + path = address_to_path(adapter, address) + iface = await self._get_interface(path, BLUEZ_DEVICE_IFACE) + await iface.call_disconnect() + + async def remove_device(self, adapter: str, address: str) -> None: + """Remove a device from the known devices list.""" + device_path = address_to_path(adapter, address) + adapter_path = f"/org/bluez/{adapter}" + iface = await self._get_interface(adapter_path, BLUEZ_ADAPTER_IFACE) + await iface.call_remove_device(device_path) + + async def set_device_trusted(self, adapter: str, address: str, trusted: bool) -> None: + """Set device trusted state.""" + path = address_to_path(adapter, address) + await self._set_property(path, BLUEZ_DEVICE_IFACE, "Trusted", trusted) + + async def set_device_blocked(self, adapter: str, address: str, blocked: bool) -> None: + """Set device blocked state.""" + path = address_to_path(adapter, address) + await self._set_property(path, BLUEZ_DEVICE_IFACE, "Blocked", blocked) + + async def set_device_alias(self, adapter: str, address: str, alias: str) -> None: + """Set device friendly name.""" + path = address_to_path(adapter, address) + await self._set_property(path, BLUEZ_DEVICE_IFACE, "Alias", alias) + + # ==================== Profile Operations ==================== + + async def connect_profile(self, adapter: str, address: str, uuid: str) -> None: + """Connect a specific profile.""" + path = address_to_path(adapter, address) + iface = await self._get_interface(path, BLUEZ_DEVICE_IFACE) + await iface.call_connect_profile(uuid) + + async def disconnect_profile(self, adapter: str, address: str, uuid: str) -> None: + """Disconnect a specific profile.""" + path = address_to_path(adapter, address) + iface = await self._get_interface(path, BLUEZ_DEVICE_IFACE) + await iface.call_disconnect_profile(uuid) + + # ==================== GATT Operations ==================== + + async def list_gatt_services(self, adapter: str, address: str) -> list[GattService]: + """List GATT services for a device.""" + device_path = address_to_path(adapter, address) + objects = await self._get_managed_objects() + services = [] + + for path, interfaces in objects.items(): + if not path.startswith(device_path + "/"): + continue + if BLUEZ_GATT_SERVICE_IFACE not in interfaces: + continue + + props = interfaces[BLUEZ_GATT_SERVICE_IFACE] + services.append( + GattService( + path=path, + uuid=props.get("UUID", ""), + primary=props.get("Primary", True), + device_path=props.get("Device", device_path), + ) + ) + + return services + + async def list_gatt_characteristics( + self, adapter: str, address: str, service_uuid: str | None = None + ) -> list[GattCharacteristic]: + """List GATT characteristics for a device, optionally filtered by service.""" + device_path = address_to_path(adapter, address) + objects = await self._get_managed_objects() + + # First find services if filtering + service_paths = set() + if service_uuid: + for path, interfaces in objects.items(): + if BLUEZ_GATT_SERVICE_IFACE in interfaces: + if interfaces[BLUEZ_GATT_SERVICE_IFACE].get("UUID", "").lower() == service_uuid.lower(): + service_paths.add(path) + + characteristics = [] + for path, interfaces in objects.items(): + if not path.startswith(device_path + "/"): + continue + if BLUEZ_GATT_CHAR_IFACE not in interfaces: + continue + + props = interfaces[BLUEZ_GATT_CHAR_IFACE] + service_path = props.get("Service", "") + + # Filter by service if specified + if service_uuid and service_path not in service_paths: + continue + + characteristics.append( + GattCharacteristic( + path=path, + uuid=props.get("UUID", ""), + service_path=service_path, + flags=props.get("Flags", []), + notifying=props.get("Notifying", False), + ) + ) + + return characteristics + + async def read_characteristic(self, char_path: str) -> bytes: + """Read a GATT characteristic value.""" + iface = await self._get_interface(char_path, BLUEZ_GATT_CHAR_IFACE) + result = await iface.call_read_value({}) + return bytes(result) + + async def write_characteristic( + self, char_path: str, value: bytes, write_type: str = "request" + ) -> None: + """Write a GATT characteristic value. + + Args: + char_path: D-Bus path of the characteristic + value: Bytes to write + write_type: "request" (with response) or "command" (without response) + """ + iface = await self._get_interface(char_path, BLUEZ_GATT_CHAR_IFACE) + options = {"type": Variant("s", write_type)} + await iface.call_write_value(list(value), options) + + async def start_notify(self, char_path: str) -> None: + """Start notifications for a characteristic.""" + iface = await self._get_interface(char_path, BLUEZ_GATT_CHAR_IFACE) + await iface.call_start_notify() + + async def stop_notify(self, char_path: str) -> None: + """Stop notifications for a characteristic.""" + iface = await self._get_interface(char_path, BLUEZ_GATT_CHAR_IFACE) + await iface.call_stop_notify() + + # ==================== Battery Operations ==================== + + async def get_battery_level(self, adapter: str, address: str) -> int | None: + """Get battery level from Battery1 interface if available.""" + path = address_to_path(adapter, address) + objects = await self._get_managed_objects() + + if path in objects and BLUEZ_BATTERY_IFACE in objects[path]: + return objects[path][BLUEZ_BATTERY_IFACE].get("Percentage") + return None + + +# Global client instance +_client: BlueZClient | None = None + + +async def get_client() -> BlueZClient: + """Get or create the global BlueZ client.""" + global _client + if _client is None: + _client = BlueZClient() + await _client.connect() + return _client diff --git a/src/mcbluetooth/server.py b/src/mcbluetooth/server.py new file mode 100644 index 0000000..b505a36 --- /dev/null +++ b/src/mcbluetooth/server.py @@ -0,0 +1,51 @@ +"""FastMCP server for BlueZ Bluetooth management.""" + +from fastmcp import FastMCP + +from mcbluetooth.tools import adapter, audio, ble, device + +mcp = FastMCP( + name="mcbluetooth", + instructions="""Bluetooth management server for Linux (BlueZ). + +This server provides comprehensive control over the Linux Bluetooth stack: +- Adapter management (power, discoverable, pairable) +- Device discovery and management (scan, pair, connect) +- Audio profiles (A2DP, HFP) with PipeWire/PulseAudio integration +- BLE/GATT services (read/write characteristics, notifications) + +All tools require an explicit 'adapter' parameter (e.g., "hci0"). +Use bt_list_adapters() to discover available adapters. + +For pairing, use pairing_mode parameter: +- "elicit": Use MCP elicitation to request PIN from user (preferred) +- "interactive": Return awaiting status, then call bt_pair_confirm +- "auto": Auto-accept pairings (use in trusted environments only) +""", +) + +# Register tool modules +adapter.register_tools(mcp) +device.register_tools(mcp) +audio.register_tools(mcp) +ble.register_tools(mcp) + + +def main(): + """Entry point for the mcbluetooth MCP server.""" + try: + from importlib.metadata import version + + package_version = version("mcbluetooth") + except Exception: + package_version = "dev" + + print(f"🔵 mcbluetooth v{package_version}") + print(" BlueZ MCP Server - Bluetooth management for LLMs") + print() + + mcp.run() + + +if __name__ == "__main__": + main() diff --git a/src/mcbluetooth/tools/__init__.py b/src/mcbluetooth/tools/__init__.py new file mode 100644 index 0000000..3041833 --- /dev/null +++ b/src/mcbluetooth/tools/__init__.py @@ -0,0 +1,5 @@ +"""MCP tool modules for Bluetooth management.""" + +from mcbluetooth.tools import adapter, audio, ble, device + +__all__ = ["adapter", "device", "audio", "ble"] diff --git a/src/mcbluetooth/tools/adapter.py b/src/mcbluetooth/tools/adapter.py new file mode 100644 index 0000000..8cee2a3 --- /dev/null +++ b/src/mcbluetooth/tools/adapter.py @@ -0,0 +1,117 @@ +"""Adapter management tools for BlueZ.""" + +from __future__ import annotations + +from dataclasses import asdict +from typing import Any + +from fastmcp import FastMCP + +from mcbluetooth.dbus_client import get_client + + +def register_tools(mcp: FastMCP) -> None: + """Register adapter management tools with the MCP server.""" + + @mcp.tool() + async def bt_list_adapters() -> list[dict[str, Any]]: + """List all Bluetooth adapters on the system. + + Returns a list of adapters with their properties including: + - name: Adapter identifier (e.g., "hci0") + - address: Bluetooth MAC address + - powered: Whether adapter is on + - discoverable: Whether adapter is visible to other devices + - discovering: Whether currently scanning + """ + client = await get_client() + adapters = await client.list_adapters() + return [asdict(a) for a in adapters] + + @mcp.tool() + async def bt_adapter_info(adapter: str) -> dict[str, Any] | None: + """Get detailed information about a specific adapter. + + Args: + adapter: Adapter name (e.g., "hci0") + + Returns: + Adapter properties or None if not found + """ + client = await get_client() + info = await client.get_adapter(adapter) + return asdict(info) if info else None + + @mcp.tool() + async def bt_adapter_power(adapter: str, on: bool) -> dict[str, Any]: + """Power an adapter on or off. + + Args: + adapter: Adapter name (e.g., "hci0") + on: True to power on, False to power off + + Returns: + Updated adapter info + """ + client = await get_client() + await client.set_adapter_power(adapter, on) + info = await client.get_adapter(adapter) + return asdict(info) if info else {"error": "Adapter not found"} + + @mcp.tool() + async def bt_adapter_discoverable( + adapter: str, + on: bool, + timeout: int = 180, + ) -> dict[str, Any]: + """Set adapter discoverable (visible to other devices). + + Args: + adapter: Adapter name (e.g., "hci0") + on: True to enable discoverable, False to disable + timeout: Discoverable timeout in seconds (0 = forever, default 180) + + Returns: + Updated adapter info + """ + client = await get_client() + await client.set_adapter_discoverable(adapter, on, timeout) + info = await client.get_adapter(adapter) + return asdict(info) if info else {"error": "Adapter not found"} + + @mcp.tool() + async def bt_adapter_pairable( + adapter: str, + on: bool, + timeout: int = 0, + ) -> dict[str, Any]: + """Set adapter pairable state. + + Args: + adapter: Adapter name (e.g., "hci0") + on: True to enable pairing acceptance, False to disable + timeout: Pairable timeout in seconds (0 = forever) + + Returns: + Updated adapter info + """ + client = await get_client() + await client.set_adapter_pairable(adapter, on, timeout) + info = await client.get_adapter(adapter) + return asdict(info) if info else {"error": "Adapter not found"} + + @mcp.tool() + async def bt_adapter_set_alias(adapter: str, alias: str) -> dict[str, Any]: + """Set adapter friendly name (alias). + + Args: + adapter: Adapter name (e.g., "hci0") + alias: New friendly name for the adapter + + Returns: + Updated adapter info + """ + client = await get_client() + await client.set_adapter_alias(adapter, alias) + info = await client.get_adapter(adapter) + return asdict(info) if info else {"error": "Adapter not found"} diff --git a/src/mcbluetooth/tools/audio.py b/src/mcbluetooth/tools/audio.py new file mode 100644 index 0000000..e4c858b --- /dev/null +++ b/src/mcbluetooth/tools/audio.py @@ -0,0 +1,287 @@ +"""Audio profile management tools for Bluetooth devices.""" + +from __future__ import annotations + +from dataclasses import asdict +from typing import Any, Literal + +from fastmcp import FastMCP + +from mcbluetooth.audio import get_audio_client +from mcbluetooth.dbus_client import get_client + +# Common Bluetooth audio UUIDs +A2DP_SINK_UUID = "0000110b-0000-1000-8000-00805f9b34fb" +A2DP_SOURCE_UUID = "0000110a-0000-1000-8000-00805f9b34fb" +HFP_HF_UUID = "0000111e-0000-1000-8000-00805f9b34fb" +HFP_AG_UUID = "0000111f-0000-1000-8000-00805f9b34fb" +HSP_HS_UUID = "00001108-0000-1000-8000-00805f9b34fb" +HSP_AG_UUID = "00001112-0000-1000-8000-00805f9b34fb" + + +def register_tools(mcp: FastMCP) -> None: + """Register audio management tools with the MCP server.""" + + @mcp.tool() + async def bt_audio_list() -> dict[str, Any]: + """List all audio devices including Bluetooth devices. + + Returns information about: + - Sinks (audio outputs like speakers, headphones) + - Sources (audio inputs like microphones) + - Cards (devices with switchable profiles) + + Bluetooth audio devices show their MAC address. + """ + audio = await get_audio_client() + + sinks = await audio.list_sinks() + sources = await audio.list_sources() + cards = await audio.list_cards() + + default_sink = await audio.get_default_sink() + default_source = await audio.get_default_source() + + return { + "default_sink": default_sink, + "default_source": default_source, + "sinks": [asdict(s) for s in sinks], + "sources": [asdict(s) for s in sources], + "cards": [asdict(c) for c in cards], + "bluetooth_sinks": [asdict(s) for s in sinks if s.is_bluetooth], + "bluetooth_cards": [asdict(c) for c in cards if c.is_bluetooth], + } + + @mcp.tool() + async def bt_audio_connect(adapter: str, address: str) -> dict[str, Any]: + """Connect audio profiles to a Bluetooth device. + + This connects the A2DP (high-quality audio) and/or HFP (hands-free) + profiles to the specified Bluetooth device. + + Args: + adapter: Adapter name (e.g., "hci0") + address: Device Bluetooth address + + Returns: + Connection status and available audio info + """ + bt_client = await get_client() + + # First ensure device is connected + device = await bt_client.get_device(adapter, address) + if not device: + return {"status": "error", "error": "Device not found"} + + if not device.paired: + return {"status": "error", "error": "Device not paired. Pair first with bt_pair."} + + if not device.connected: + try: + await bt_client.connect_device(adapter, address) + except Exception as e: + return {"status": "error", "error": f"Connection failed: {e}"} + + # Try to connect audio profiles + errors = [] + connected_profiles = [] + + for uuid, name in [(A2DP_SINK_UUID, "A2DP"), (HFP_HF_UUID, "HFP")]: + if uuid.lower() in [u.lower() for u in device.uuids]: + try: + await bt_client.connect_profile(adapter, address, uuid) + connected_profiles.append(name) + except Exception as e: + errors.append(f"{name}: {e}") + + # Check audio system + audio = await get_audio_client() + sink = await audio.find_bluetooth_sink(address) + card = await audio.find_bluetooth_card(address) + + return { + "status": "connected" if connected_profiles else "no_audio_profiles", + "connected_profiles": connected_profiles, + "errors": errors if errors else None, + "audio_sink": asdict(sink) if sink else None, + "audio_card": asdict(card) if card else None, + } + + @mcp.tool() + async def bt_audio_disconnect(adapter: str, address: str) -> dict[str, Any]: + """Disconnect audio profiles from a Bluetooth device. + + This disconnects audio profiles but keeps the device connected + for other services (if any). + + Args: + adapter: Adapter name (e.g., "hci0") + address: Device Bluetooth address + + Returns: + Disconnection status + """ + bt_client = await get_client() + + device = await bt_client.get_device(adapter, address) + if not device: + return {"status": "error", "error": "Device not found"} + + errors = [] + disconnected_profiles = [] + + for uuid, name in [(A2DP_SINK_UUID, "A2DP"), (HFP_HF_UUID, "HFP")]: + if uuid.lower() in [u.lower() for u in device.uuids]: + try: + await bt_client.disconnect_profile(adapter, address, uuid) + disconnected_profiles.append(name) + except Exception as e: + errors.append(f"{name}: {e}") + + return { + "status": "disconnected" if disconnected_profiles else "no_audio_profiles", + "disconnected_profiles": disconnected_profiles, + "errors": errors if errors else None, + } + + @mcp.tool() + async def bt_audio_set_profile( + address: str, + profile: Literal["a2dp", "hfp", "off"], + ) -> dict[str, Any]: + """Switch audio profile for a Bluetooth device. + + Bluetooth audio devices support different profiles: + - a2dp: High-quality stereo audio (best for music) + - hfp: Hands-free profile with microphone (lower quality, for calls) + - off: Disable audio for this device + + Args: + address: Device Bluetooth address + profile: Profile to activate + + Returns: + Status and updated card info + """ + audio = await get_audio_client() + + card = await audio.find_bluetooth_card(address) + if not card: + return {"status": "error", "error": "Bluetooth audio card not found"} + + # Map profile names to PulseAudio profile names + # These vary by PipeWire/PulseAudio version + profile_map = { + "a2dp": ["a2dp-sink", "a2dp_sink", "a2dp-sink-aac", "a2dp-sink-sbc"], + "hfp": ["headset-head-unit", "headset_head_unit", "handsfree_head_unit"], + "off": ["off"], + } + + target_profiles = profile_map[profile] + + # Find a matching profile + matched_profile = None + for p in target_profiles: + if p in card.profiles: + matched_profile = p + break + + if not matched_profile: + return { + "status": "error", + "error": f"Profile '{profile}' not available", + "available_profiles": card.profiles, + } + + try: + await audio.set_card_profile(card.name, matched_profile) + # Refresh card info + card = await audio.find_bluetooth_card(address) + return { + "status": "profile_changed", + "profile": matched_profile, + "card": asdict(card) if card else None, + } + except Exception as e: + return {"status": "error", "error": str(e)} + + @mcp.tool() + async def bt_audio_set_default(address: str) -> dict[str, Any]: + """Set a Bluetooth device as the default audio output. + + Args: + address: Device Bluetooth address + + Returns: + Status and updated default sink info + """ + audio = await get_audio_client() + + sink = await audio.find_bluetooth_sink(address) + if not sink: + return {"status": "error", "error": "Bluetooth audio sink not found"} + + try: + await audio.set_default_sink(sink.name) + return { + "status": "default_set", + "sink_name": sink.name, + "description": sink.description, + } + except Exception as e: + return {"status": "error", "error": str(e)} + + @mcp.tool() + async def bt_audio_volume(address: str, volume: int) -> dict[str, Any]: + """Set volume for a Bluetooth audio device. + + Args: + address: Device Bluetooth address + volume: Volume level 0-100 (can go up to 150 for boost) + + Returns: + Status and updated volume info + """ + audio = await get_audio_client() + + sink = await audio.find_bluetooth_sink(address) + if not sink: + return {"status": "error", "error": "Bluetooth audio sink not found"} + + try: + await audio.set_sink_volume(sink.name, volume) + # Refresh sink info + sink = await audio.find_bluetooth_sink(address) + return { + "status": "volume_set", + "volume": sink.volume_percent if sink else volume, + "sink_name": sink.name if sink else None, + } + except Exception as e: + return {"status": "error", "error": str(e)} + + @mcp.tool() + async def bt_audio_mute(address: str, muted: bool) -> dict[str, Any]: + """Mute or unmute a Bluetooth audio device. + + Args: + address: Device Bluetooth address + muted: True to mute, False to unmute + + Returns: + Status and mute state + """ + audio = await get_audio_client() + + sink = await audio.find_bluetooth_sink(address) + if not sink: + return {"status": "error", "error": "Bluetooth audio sink not found"} + + try: + await audio.set_sink_mute(sink.name, muted) + return { + "status": "muted" if muted else "unmuted", + "sink_name": sink.name, + } + except Exception as e: + return {"status": "error", "error": str(e)} diff --git a/src/mcbluetooth/tools/ble.py b/src/mcbluetooth/tools/ble.py new file mode 100644 index 0000000..1a69264 --- /dev/null +++ b/src/mcbluetooth/tools/ble.py @@ -0,0 +1,403 @@ +"""Bluetooth Low Energy (BLE) and GATT tools.""" + +from __future__ import annotations + +import asyncio +from typing import Any + +from fastmcp import FastMCP + +from mcbluetooth.dbus_client import get_client + +# Common BLE service UUIDs +BATTERY_SERVICE_UUID = "0000180f-0000-1000-8000-00805f9b34fb" +BATTERY_LEVEL_CHAR_UUID = "00002a19-0000-1000-8000-00805f9b34fb" +DEVICE_INFO_SERVICE_UUID = "0000180a-0000-1000-8000-00805f9b34fb" +HEART_RATE_SERVICE_UUID = "0000180d-0000-1000-8000-00805f9b34fb" +HEART_RATE_MEASUREMENT_UUID = "00002a37-0000-1000-8000-00805f9b34fb" + + +def _format_uuid(uuid: str) -> str: + """Format UUID for display - short form for standard UUIDs.""" + # Standard Bluetooth UUIDs have the form 0000XXXX-0000-1000-8000-00805f9b34fb + if uuid.lower().endswith("-0000-1000-8000-00805f9b34fb"): + short = uuid[:8].lstrip("0") or "0" + return f"0x{short.upper()}" + return uuid + + +def register_tools(mcp: FastMCP) -> None: + """Register BLE/GATT tools with the MCP server.""" + + @mcp.tool() + async def bt_ble_scan( + adapter: str, + timeout: int = 10, + name_filter: str | None = None, + service_filter: str | None = None, + ) -> list[dict[str, Any]]: + """Scan for BLE (Bluetooth Low Energy) devices. + + BLE devices advertise their presence and can be filtered by name + or service UUID. Common services include: + - 0x180F: Battery Service + - 0x180D: Heart Rate Service + - 0x180A: Device Information + + Args: + adapter: Adapter name (e.g., "hci0") + timeout: Scan duration in seconds + name_filter: Only return devices with name containing this string + service_filter: Only return devices advertising this service UUID + + Returns: + List of BLE devices with advertisement data + """ + client = await get_client() + + # Set BLE-specific filter + uuids = [service_filter] if service_filter else None + await client.set_discovery_filter( + adapter, uuids=uuids, transport="le", duplicate_data=True + ) + + await client.start_discovery(adapter) + try: + await asyncio.sleep(timeout) + finally: + await client.stop_discovery(adapter) + await client.remove_discovery_filter(adapter) + + devices = await client.list_devices(adapter=adapter) + + # Apply name filter + if name_filter: + name_filter_lower = name_filter.lower() + devices = [d for d in devices if name_filter_lower in d.name.lower()] + + # Format for BLE-specific output + result = [] + for d in devices: + # Parse manufacturer data for display + mfr_data_hex = { + k: v.hex() if isinstance(v, bytes) else str(v) + for k, v in d.manufacturer_data.items() + } + svc_data_hex = { + k: v.hex() if isinstance(v, bytes) else str(v) + for k, v in d.service_data.items() + } + + result.append({ + "address": d.address, + "name": d.name or "(unknown)", + "rssi": d.rssi, + "paired": d.paired, + "connected": d.connected, + "uuids": [_format_uuid(u) for u in d.uuids], + "manufacturer_data": mfr_data_hex, + "service_data": svc_data_hex, + "appearance": d.appearance, + }) + + return result + + @mcp.tool() + async def bt_ble_services(adapter: str, address: str) -> list[dict[str, Any]]: + """List GATT services for a connected BLE device. + + The device must be connected for service discovery. + Services contain characteristics that can be read/written. + + Args: + adapter: Adapter name (e.g., "hci0") + address: Device Bluetooth address + + Returns: + List of GATT services with their UUIDs + """ + client = await get_client() + + # Check device is connected + device = await client.get_device(adapter, address) + if not device: + return [{"error": "Device not found"}] + if not device.connected: + return [{"error": "Device not connected. Use bt_connect first."}] + if not device.services_resolved: + # Wait a bit for services to resolve + await asyncio.sleep(2) + device = await client.get_device(adapter, address) + if not device or not device.services_resolved: + return [{"error": "Services not yet resolved. Try again in a moment."}] + + services = await client.list_gatt_services(adapter, address) + return [ + { + "uuid": s.uuid, + "uuid_short": _format_uuid(s.uuid), + "primary": s.primary, + "path": s.path, + } + for s in services + ] + + @mcp.tool() + async def bt_ble_characteristics( + adapter: str, + address: str, + service_uuid: str | None = None, + ) -> list[dict[str, Any]]: + """List GATT characteristics for a BLE device. + + Characteristics are the data points within a service that can be + read, written, or subscribed to for notifications. + + Args: + adapter: Adapter name (e.g., "hci0") + address: Device Bluetooth address + service_uuid: Filter to characteristics of this service (optional) + + Returns: + List of characteristics with their properties + """ + client = await get_client() + + device = await client.get_device(adapter, address) + if not device or not device.connected: + return [{"error": "Device not connected"}] + + chars = await client.list_gatt_characteristics(adapter, address, service_uuid) + return [ + { + "uuid": c.uuid, + "uuid_short": _format_uuid(c.uuid), + "path": c.path, + "flags": c.flags, + "notifying": c.notifying, + "readable": "read" in c.flags, + "writable": "write" in c.flags or "write-without-response" in c.flags, + "notifiable": "notify" in c.flags or "indicate" in c.flags, + } + for c in chars + ] + + @mcp.tool() + async def bt_ble_read( + adapter: str, + address: str, + char_uuid: str, + ) -> dict[str, Any]: + """Read a GATT characteristic value. + + Args: + adapter: Adapter name (e.g., "hci0") + address: Device Bluetooth address + char_uuid: Characteristic UUID to read + + Returns: + The characteristic value as hex and decoded (if possible) + """ + client = await get_client() + + # Find the characteristic + chars = await client.list_gatt_characteristics(adapter, address) + target_char = None + for c in chars: + if c.uuid.lower() == char_uuid.lower(): + target_char = c + break + + if not target_char: + return {"error": f"Characteristic {char_uuid} not found"} + + if "read" not in target_char.flags: + return {"error": "Characteristic is not readable"} + + try: + value = await client.read_characteristic(target_char.path) + result = { + "uuid": char_uuid, + "value_hex": value.hex(), + "value_bytes": list(value), + "length": len(value), + } + + # Try to decode as UTF-8 string + try: + result["value_string"] = value.decode("utf-8") + except UnicodeDecodeError: + pass + + # Try to decode as integer (common for single-byte values) + if len(value) == 1: + result["value_int"] = value[0] + elif len(value) == 2: + result["value_int"] = int.from_bytes(value, "little") + + return result + except Exception as e: + return {"error": str(e)} + + @mcp.tool() + async def bt_ble_write( + adapter: str, + address: str, + char_uuid: str, + value: str, + value_type: str = "hex", + with_response: bool = True, + ) -> dict[str, Any]: + """Write a value to a GATT characteristic. + + Args: + adapter: Adapter name (e.g., "hci0") + address: Device Bluetooth address + char_uuid: Characteristic UUID to write + value: Value to write + value_type: How to interpret value - "hex" (e.g., "0102ff"), + "string" (UTF-8), or "int" (decimal number) + with_response: Whether to wait for write confirmation + + Returns: + Write status + """ + client = await get_client() + + # Find the characteristic + chars = await client.list_gatt_characteristics(adapter, address) + target_char = None + for c in chars: + if c.uuid.lower() == char_uuid.lower(): + target_char = c + break + + if not target_char: + return {"error": f"Characteristic {char_uuid} not found"} + + if "write" not in target_char.flags and "write-without-response" not in target_char.flags: + return {"error": "Characteristic is not writable"} + + # Convert value to bytes + try: + if value_type == "hex": + data = bytes.fromhex(value) + elif value_type == "string": + data = value.encode("utf-8") + elif value_type == "int": + int_val = int(value) + # Determine byte length needed + byte_len = max(1, (int_val.bit_length() + 7) // 8) + data = int_val.to_bytes(byte_len, "little") + else: + return {"error": f"Unknown value_type: {value_type}"} + except ValueError as e: + return {"error": f"Invalid value format: {e}"} + + try: + write_type = "request" if with_response else "command" + await client.write_characteristic(target_char.path, data, write_type) + return { + "status": "written", + "uuid": char_uuid, + "value_hex": data.hex(), + "length": len(data), + } + except Exception as e: + return {"error": str(e)} + + @mcp.tool() + async def bt_ble_notify( + adapter: str, + address: str, + char_uuid: str, + enable: bool, + ) -> dict[str, Any]: + """Enable or disable notifications for a characteristic. + + When enabled, the device will send updates when the characteristic + value changes (e.g., heart rate measurements, sensor data). + + Note: To receive actual notifications, you would need to set up + a callback - this tool just enables/disables the notification mode. + + Args: + adapter: Adapter name (e.g., "hci0") + address: Device Bluetooth address + char_uuid: Characteristic UUID + enable: True to enable notifications, False to disable + + Returns: + Notification status + """ + client = await get_client() + + chars = await client.list_gatt_characteristics(adapter, address) + target_char = None + for c in chars: + if c.uuid.lower() == char_uuid.lower(): + target_char = c + break + + if not target_char: + return {"error": f"Characteristic {char_uuid} not found"} + + if "notify" not in target_char.flags and "indicate" not in target_char.flags: + return {"error": "Characteristic does not support notifications"} + + try: + if enable: + await client.start_notify(target_char.path) + else: + await client.stop_notify(target_char.path) + + return { + "status": "notifications_enabled" if enable else "notifications_disabled", + "uuid": char_uuid, + } + except Exception as e: + return {"error": str(e)} + + @mcp.tool() + async def bt_ble_battery(adapter: str, address: str) -> dict[str, Any]: + """Read battery level from a BLE device. + + Many BLE devices support the standard Battery Service (0x180F). + This is a convenience function that reads from that service. + + Args: + adapter: Adapter name (e.g., "hci0") + address: Device Bluetooth address + + Returns: + Battery percentage (0-100) or error + """ + client = await get_client() + + # First try BlueZ Battery1 interface (cached value) + battery_level = await client.get_battery_level(adapter, address) + if battery_level is not None: + return {"battery_percent": battery_level, "source": "bluez_battery1"} + + # Fall back to reading GATT characteristic + device = await client.get_device(adapter, address) + if not device or not device.connected: + return {"error": "Device not connected"} + + chars = await client.list_gatt_characteristics(adapter, address, BATTERY_SERVICE_UUID) + battery_char = None + for c in chars: + if c.uuid.lower() == BATTERY_LEVEL_CHAR_UUID.lower(): + battery_char = c + break + + if not battery_char: + return {"error": "Battery service not available on this device"} + + try: + value = await client.read_characteristic(battery_char.path) + if len(value) >= 1: + return {"battery_percent": value[0], "source": "gatt_characteristic"} + return {"error": "Invalid battery value"} + except Exception as e: + return {"error": str(e)} diff --git a/src/mcbluetooth/tools/device.py b/src/mcbluetooth/tools/device.py new file mode 100644 index 0000000..2b834bf --- /dev/null +++ b/src/mcbluetooth/tools/device.py @@ -0,0 +1,314 @@ +"""Device discovery and management tools for BlueZ.""" + +from __future__ import annotations + +import asyncio +from dataclasses import asdict +from typing import Any, Literal + +from fastmcp import FastMCP + +from mcbluetooth.dbus_client import get_client + + +def register_tools(mcp: FastMCP) -> None: + """Register device management tools with the MCP server.""" + + @mcp.tool() + async def bt_scan( + adapter: str, + timeout: int = 10, + mode: Literal["classic", "ble", "both"] = "both", + ) -> list[dict[str, Any]]: + """Scan for nearby Bluetooth devices. + + Starts discovery, waits for the specified timeout, then returns + all discovered devices. For BLE devices, ensure they are advertising. + + Args: + adapter: Adapter name (e.g., "hci0") + timeout: Scan duration in seconds (default 10) + mode: Scan mode - "classic" (BR/EDR), "ble" (Low Energy), or "both" + + Returns: + List of discovered devices with their properties + """ + client = await get_client() + + # Set discovery filter based on mode + transport = {"classic": "bredr", "ble": "le", "both": "auto"}[mode] + await client.set_discovery_filter(adapter, transport=transport, duplicate_data=True) + + # Start discovery + await client.start_discovery(adapter) + + try: + # Wait for scan duration + await asyncio.sleep(timeout) + finally: + # Always stop discovery + await client.stop_discovery(adapter) + await client.remove_discovery_filter(adapter) + + # Return discovered devices + devices = await client.list_devices(adapter=adapter) + return [asdict(d) for d in devices] + + @mcp.tool() + async def bt_list_devices( + adapter: str, + filter: Literal["all", "paired", "connected", "trusted"] = "all", + ) -> list[dict[str, Any]]: + """List known Bluetooth devices. + + Returns devices that have been discovered or paired previously. + Use bt_scan to discover new devices. + + Args: + adapter: Adapter name (e.g., "hci0") + filter: Filter type - "all", "paired", "connected", or "trusted" + + Returns: + List of devices matching the filter + """ + client = await get_client() + devices = await client.list_devices(adapter=adapter, filter_type=filter) + return [asdict(d) for d in devices] + + @mcp.tool() + async def bt_device_info(adapter: str, address: str) -> dict[str, Any] | None: + """Get detailed information about a specific device. + + Args: + adapter: Adapter name (e.g., "hci0") + address: Device Bluetooth address (e.g., "AA:BB:CC:DD:EE:FF") + + Returns: + Device properties or None if not found + """ + client = await get_client() + info = await client.get_device(adapter, address) + return asdict(info) if info else None + + @mcp.tool() + async def bt_pair( + adapter: str, + address: str, + pairing_mode: Literal["elicit", "interactive", "auto"] = "interactive", + ) -> dict[str, Any]: + """Initiate pairing with a device. + + Pairing establishes a trusted relationship with a device. Some devices + require PIN entry or confirmation during pairing. + + Args: + adapter: Adapter name (e.g., "hci0") + address: Device Bluetooth address + pairing_mode: How to handle pairing requests: + - "elicit": Use MCP elicitation to request PIN from user (if supported) + - "interactive": Return status, then call bt_pair_confirm with PIN + - "auto": Auto-accept pairings (for trusted environments) + + Returns: + Pairing status including whether confirmation is needed + """ + client = await get_client() + + # Check if already paired + device = await client.get_device(adapter, address) + if device and device.paired: + return {"status": "already_paired", "device": asdict(device)} + + if pairing_mode == "auto": + # Direct pairing without agent - may fail if PIN required + try: + await client.pair_device(adapter, address) + device = await client.get_device(adapter, address) + return {"status": "paired", "device": asdict(device) if device else None} + except Exception as e: + return {"status": "error", "error": str(e)} + else: + # For interactive/elicit modes, we need an agent + # Return status indicating pairing initiated + try: + # Start pairing (this is async and may block for confirmation) + # In a real implementation, this would register an agent + await client.pair_device(adapter, address) + device = await client.get_device(adapter, address) + return {"status": "paired", "device": asdict(device) if device else None} + except Exception as e: + error_msg = str(e) + if "AuthenticationFailed" in error_msg or "AuthenticationCanceled" in error_msg: + return { + "status": "awaiting_confirmation", + "message": "Pairing requires user confirmation or PIN entry", + "pairing_mode": pairing_mode, + } + return {"status": "error", "error": error_msg} + + @mcp.tool() + async def bt_pair_confirm( + adapter: str, + address: str, + pin: str | None = None, + accept: bool = True, + ) -> dict[str, Any]: + """Confirm or reject a pairing request. + + Use this after bt_pair returns "awaiting_confirmation" status. + + Args: + adapter: Adapter name + address: Device Bluetooth address + pin: PIN code if required (usually 4-6 digits) + accept: True to accept pairing, False to reject + + Returns: + Pairing result + """ + client = await get_client() + + if not accept: + try: + await client.cancel_pairing(adapter, address) + return {"status": "pairing_cancelled"} + except Exception as e: + return {"status": "error", "error": str(e)} + + # Re-attempt pairing - agent would handle PIN + # For now, this is a simplified implementation + try: + await client.pair_device(adapter, address) + device = await client.get_device(adapter, address) + return {"status": "paired", "device": asdict(device) if device else None} + except Exception as e: + return {"status": "error", "error": str(e)} + + @mcp.tool() + async def bt_unpair(adapter: str, address: str) -> dict[str, str]: + """Remove pairing with a device. + + This removes the device from the list of known devices and + deletes all pairing information. + + Args: + adapter: Adapter name (e.g., "hci0") + address: Device Bluetooth address + + Returns: + Status of the operation + """ + client = await get_client() + try: + await client.remove_device(adapter, address) + return {"status": "removed", "address": address} + except Exception as e: + return {"status": "error", "error": str(e)} + + @mcp.tool() + async def bt_connect(adapter: str, address: str) -> dict[str, Any]: + """Connect to a paired device. + + Establishes an active connection to a previously paired device. + For audio devices, this will also connect audio profiles. + + Args: + adapter: Adapter name (e.g., "hci0") + address: Device Bluetooth address + + Returns: + Connection status and device info + """ + client = await get_client() + try: + await client.connect_device(adapter, address) + device = await client.get_device(adapter, address) + return {"status": "connected", "device": asdict(device) if device else None} + except Exception as e: + return {"status": "error", "error": str(e)} + + @mcp.tool() + async def bt_disconnect(adapter: str, address: str) -> dict[str, Any]: + """Disconnect from a device. + + Terminates the active connection but preserves pairing. + + Args: + adapter: Adapter name (e.g., "hci0") + address: Device Bluetooth address + + Returns: + Status of the operation + """ + client = await get_client() + try: + await client.disconnect_device(adapter, address) + device = await client.get_device(adapter, address) + return {"status": "disconnected", "device": asdict(device) if device else None} + except Exception as e: + return {"status": "error", "error": str(e)} + + @mcp.tool() + async def bt_trust(adapter: str, address: str, trusted: bool) -> dict[str, Any]: + """Set device trust status. + + Trusted devices can connect automatically without explicit + authorization. + + Args: + adapter: Adapter name (e.g., "hci0") + address: Device Bluetooth address + trusted: True to trust, False to untrust + + Returns: + Updated device info + """ + client = await get_client() + try: + await client.set_device_trusted(adapter, address, trusted) + device = await client.get_device(adapter, address) + return {"status": "updated", "device": asdict(device) if device else None} + except Exception as e: + return {"status": "error", "error": str(e)} + + @mcp.tool() + async def bt_block(adapter: str, address: str, blocked: bool) -> dict[str, Any]: + """Block or unblock a device. + + Blocked devices cannot connect to this adapter. + + Args: + adapter: Adapter name (e.g., "hci0") + address: Device Bluetooth address + blocked: True to block, False to unblock + + Returns: + Updated device info + """ + client = await get_client() + try: + await client.set_device_blocked(adapter, address, blocked) + device = await client.get_device(adapter, address) + return {"status": "updated", "device": asdict(device) if device else None} + except Exception as e: + return {"status": "error", "error": str(e)} + + @mcp.tool() + async def bt_device_set_alias(adapter: str, address: str, alias: str) -> dict[str, Any]: + """Set a friendly name for a device. + + Args: + adapter: Adapter name (e.g., "hci0") + address: Device Bluetooth address + alias: New friendly name + + Returns: + Updated device info + """ + client = await get_client() + try: + await client.set_device_alias(adapter, address, alias) + device = await client.get_device(adapter, address) + return {"status": "updated", "device": asdict(device) if device else None} + except Exception as e: + return {"status": "error", "error": str(e)} diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..cc8b59e --- /dev/null +++ b/tests/__init__.py @@ -0,0 +1 @@ +"""Tests for mcbluetooth.""" diff --git a/tests/test_dbus_client.py b/tests/test_dbus_client.py new file mode 100644 index 0000000..b73551a --- /dev/null +++ b/tests/test_dbus_client.py @@ -0,0 +1,93 @@ +"""Tests for the BlueZ D-Bus client.""" + +import pytest +from unittest.mock import AsyncMock, MagicMock, patch + +from mcbluetooth.dbus_client import ( + BlueZClient, + AdapterInfo, + DeviceInfo, + address_to_path, + path_to_address, +) + + +class TestPathConversion: + """Test address/path conversion utilities.""" + + def test_address_to_path(self): + assert address_to_path("hci0", "AA:BB:CC:DD:EE:FF") == "/org/bluez/hci0/dev_AA_BB_CC_DD_EE_FF" + assert address_to_path("hci1", "11:22:33:44:55:66") == "/org/bluez/hci1/dev_11_22_33_44_55_66" + + def test_address_to_path_lowercase(self): + # Should uppercase the address + assert address_to_path("hci0", "aa:bb:cc:dd:ee:ff") == "/org/bluez/hci0/dev_AA_BB_CC_DD_EE_FF" + + def test_path_to_address(self): + assert path_to_address("/org/bluez/hci0/dev_AA_BB_CC_DD_EE_FF") == "AA:BB:CC:DD:EE:FF" + assert path_to_address("/org/bluez/hci1/dev_11_22_33_44_55_66") == "11:22:33:44:55:66" + + def test_path_to_address_invalid(self): + assert path_to_address("/org/bluez/hci0") == "" + assert path_to_address("/invalid/path") == "" + + +class TestAdapterInfo: + """Test AdapterInfo dataclass.""" + + def test_adapter_info_creation(self): + info = AdapterInfo( + path="/org/bluez/hci0", + name="hci0", + address="AA:BB:CC:DD:EE:FF", + alias="My Adapter", + powered=True, + discoverable=False, + discoverable_timeout=180, + pairable=True, + pairable_timeout=0, + discovering=False, + ) + assert info.name == "hci0" + assert info.powered is True + assert info.discoverable is False + + +class TestDeviceInfo: + """Test DeviceInfo dataclass.""" + + def test_device_info_creation(self): + info = DeviceInfo( + path="/org/bluez/hci0/dev_AA_BB_CC_DD_EE_FF", + adapter="hci0", + address="AA:BB:CC:DD:EE:FF", + name="Test Device", + alias="My Device", + paired=True, + bonded=True, + trusted=True, + blocked=False, + connected=True, + rssi=-50, + tx_power=4, + ) + assert info.address == "AA:BB:CC:DD:EE:FF" + assert info.paired is True + assert info.connected is True + assert info.rssi == -50 + + +class TestBlueZClient: + """Test BlueZClient methods with mocked D-Bus.""" + + @pytest.fixture + def client(self): + return BlueZClient() + + @pytest.mark.asyncio + async def test_client_not_connected_initially(self, client): + assert client._bus is None + assert client._object_manager is None + + # Additional tests would require more extensive mocking of dbus-fast + # which is better done as integration tests with a real BlueZ stack