diff --git a/README.md b/README.md index 29f892e..7858b0a 100644 --- a/README.md +++ b/README.md @@ -127,6 +127,18 @@ The server exposes dynamic resources for live state queries: | `bt_ble_notify` | Enable/disable notifications | | `bt_ble_battery` | Read battery level | +### Monitor Tools (btmon integration) +| Tool | Description | +|------|-------------| +| `bt_capture_start` | Start HCI traffic capture to btsnoop file | +| `bt_capture_stop` | Stop a running capture | +| `bt_capture_list_active` | List active captures | +| `bt_capture_parse` | Parse btsnoop file into structured packets | +| `bt_capture_analyze` | Analyze capture with btmon statistics | +| `bt_capture_read_raw` | Read human-readable decoded packets | + +> **Note:** Live capture requires elevated privileges. Run `sudo setcap cap_net_raw+ep /usr/bin/btmon` to enable without sudo. + ## Example Prompts ``` @@ -146,21 +158,21 @@ The server exposes dynamic resources for live state queries: ## Architecture ``` -┌─────────────────────────────────────────────┐ -│ FastMCP Server │ -├─────────────────────────────────────────────┤ -│ Tool Categories │ -│ ┌─────────┬─────────┬─────────┬─────────┐ │ -│ │ Adapter │ Device │ Audio │ BLE │ │ -│ │ Tools │ Tools │ Tools │ Tools │ │ -│ └─────────┴─────────┴─────────┴─────────┘ │ -├─────────────────────────────────────────────┤ -│ BlueZ D-Bus Client Layer │ -│ (dbus-fast) │ -├─────────────────────────────────────────────┤ -│ PipeWire/PulseAudio Integration │ -│ (pulsectl-asyncio) │ -└─────────────────────────────────────────────┘ +┌─────────────────────────────────────────────────────┐ +│ FastMCP Server │ +├─────────────────────────────────────────────────────┤ +│ Tool Categories │ +│ ┌─────────┬─────────┬─────────┬─────────┬────────┐ │ +│ │ Adapter │ Device │ Audio │ BLE │Monitor │ │ +│ │ Tools │ Tools │ Tools │ Tools │ Tools │ │ +│ └─────────┴─────────┴─────────┴─────────┴────────┘ │ +├─────────────────────────────────────────────────────┤ +│ BlueZ D-Bus Client │ btmon (HCI capture) │ +│ (dbus-fast) │ (btsnoop format) │ +├─────────────────────────────────────────────────────┤ +│ PipeWire/PulseAudio Integration │ +│ (pulsectl-asyncio) │ +└─────────────────────────────────────────────────────┘ ``` ## License diff --git a/src/mcbluetooth/server.py b/src/mcbluetooth/server.py index b237662..cccceaa 100644 --- a/src/mcbluetooth/server.py +++ b/src/mcbluetooth/server.py @@ -3,7 +3,7 @@ from fastmcp import FastMCP from mcbluetooth import resources -from mcbluetooth.tools import adapter, audio, ble, device +from mcbluetooth.tools import adapter, audio, ble, device, monitor mcp = FastMCP( name="mcbluetooth", @@ -43,6 +43,7 @@ adapter.register_tools(mcp) device.register_tools(mcp) audio.register_tools(mcp) ble.register_tools(mcp) +monitor.register_tools(mcp) def main(): diff --git a/src/mcbluetooth/tools/__init__.py b/src/mcbluetooth/tools/__init__.py index 3041833..9776b08 100644 --- a/src/mcbluetooth/tools/__init__.py +++ b/src/mcbluetooth/tools/__init__.py @@ -1,5 +1,5 @@ """MCP tool modules for Bluetooth management.""" -from mcbluetooth.tools import adapter, audio, ble, device +from mcbluetooth.tools import adapter, audio, ble, device, monitor -__all__ = ["adapter", "device", "audio", "ble"] +__all__ = ["adapter", "device", "audio", "ble", "monitor"] diff --git a/src/mcbluetooth/tools/monitor.py b/src/mcbluetooth/tools/monitor.py new file mode 100644 index 0000000..650158f --- /dev/null +++ b/src/mcbluetooth/tools/monitor.py @@ -0,0 +1,501 @@ +"""Bluetooth traffic monitoring tools using btmon. + +btmon is the BlueZ Bluetooth monitor that captures HCI traffic. +These tools provide MCP integration for: +- Starting/stopping packet captures +- Analyzing btsnoop capture files +- Parsing captured packets for protocol analysis + +Note: Live capture requires elevated privileges (CAP_NET_RAW or sudo). +""" + +from __future__ import annotations + +import asyncio +import shutil +import struct +from dataclasses import dataclass +from datetime import datetime +from pathlib import Path +from typing import Literal + +from fastmcp import Context, FastMCP + +# btsnoop file format constants +BTSNOOP_MAGIC = b"btsnoop\x00" +BTSNOOP_VERSION = 1 +BTSNOOP_DATALINK_HCI = 1002 # HCI UART (H4) + +# HCI packet types +HCI_PACKET_TYPES = { + 0x01: "HCI_CMD", + 0x02: "ACL_DATA", + 0x03: "SCO_DATA", + 0x04: "HCI_EVENT", + 0x05: "ISO_DATA", +} + +# Track running captures +_running_captures: dict[str, asyncio.subprocess.Process] = {} + + +@dataclass +class BtsnoopPacket: + """A single packet from a btsnoop capture.""" + + index: int + timestamp: datetime + direction: Literal["TX", "RX"] + packet_type: str + data: bytes + original_length: int + included_length: int + + def hex_dump(self, max_bytes: int = 32) -> str: + """Return hex representation of packet data.""" + data = self.data[:max_bytes] + hex_str = data.hex() + # Format as pairs with spaces + pairs = [hex_str[i : i + 2] for i in range(0, len(hex_str), 2)] + result = " ".join(pairs) + if len(self.data) > max_bytes: + result += f" ... ({len(self.data)} bytes total)" + return result + + +def parse_btsnoop_file(filepath: str, max_packets: int = 100) -> list[BtsnoopPacket]: + """Parse a btsnoop capture file and return packets. + + Args: + filepath: Path to the btsnoop file + max_packets: Maximum number of packets to return (0 for all) + + Returns: + List of BtsnoopPacket objects + """ + packets = [] + path = Path(filepath) + + if not path.exists(): + raise FileNotFoundError(f"Capture file not found: {filepath}") + + with open(path, "rb") as f: + # Read and validate header + magic = f.read(8) + if magic != BTSNOOP_MAGIC: + raise ValueError(f"Not a valid btsnoop file: {filepath}") + + version, datalink = struct.unpack(">II", f.read(8)) + if version != BTSNOOP_VERSION: + raise ValueError(f"Unsupported btsnoop version: {version}") + + # Read packets + packet_index = 0 + while True: + if max_packets > 0 and packet_index >= max_packets: + break + + # Read packet header (24 bytes) + header = f.read(24) + if len(header) < 24: + break # EOF + + ( + original_length, + included_length, + packet_flags, + cumulative_drops, + timestamp_us, + ) = struct.unpack(">IIIIQ", header) + + # Read packet data + data = f.read(included_length) + if len(data) < included_length: + break # Truncated + + # Parse flags + # Bit 0: 0=TX (host->controller), 1=RX (controller->host) + # Bit 1: 0=data, 1=command/event + direction = "RX" if (packet_flags & 0x01) else "TX" + + # First byte is HCI packet type indicator (H4 protocol) + if data: + pkt_type_byte = data[0] + packet_type = HCI_PACKET_TYPES.get(pkt_type_byte, f"UNKNOWN_0x{pkt_type_byte:02x}") + else: + packet_type = "EMPTY" + + # Convert timestamp (microseconds since epoch 2000-01-01) + # btsnoop uses a different epoch than Unix + btsnoop_epoch = datetime(2000, 1, 1) + timestamp = btsnoop_epoch.replace( + microsecond=0 + ) + __import__("datetime").timedelta(microseconds=timestamp_us) + + packets.append( + BtsnoopPacket( + index=packet_index, + timestamp=timestamp, + direction=direction, + packet_type=packet_type, + data=data, + original_length=original_length, + included_length=included_length, + ) + ) + packet_index += 1 + + return packets + + +def register_tools(mcp: FastMCP) -> None: + """Register btmon monitoring tools with the MCP server.""" + + @mcp.tool() + async def bt_capture_start( + output_file: str, + adapter: str | None = None, + include_sco: bool = False, + include_a2dp: bool = False, + include_iso: bool = False, + ctx: Context | None = None, + ) -> dict: + """Start capturing Bluetooth HCI traffic to a btsnoop file. + + Requires elevated privileges (sudo or CAP_NET_RAW on btmon). + The capture runs in the background until bt_capture_stop is called. + + Args: + output_file: Path to save the btsnoop capture file + adapter: Adapter index to capture (e.g., "0" for hci0), or None for all + include_sco: Include SCO (voice) traffic in capture + include_a2dp: Include A2DP (audio streaming) traffic in capture + include_iso: Include ISO (LE Audio) traffic in capture + + Returns: + Status dict with capture_id if started successfully + """ + # Check if btmon exists + btmon_path = shutil.which("btmon") + if not btmon_path: + return { + "success": False, + "error": "btmon not found. Install bluez-utils package.", + } + + # Expand path + output_path = Path(output_file).expanduser().resolve() + output_path.parent.mkdir(parents=True, exist_ok=True) + + # Build command + cmd = ["btmon", "-w", str(output_path)] + + if adapter is not None: + cmd.extend(["-i", str(adapter)]) + if include_sco: + cmd.append("-S") + if include_a2dp: + cmd.append("-A") + if include_iso: + cmd.append("-I") + + # Generate capture ID + capture_id = f"capture_{datetime.now().strftime('%Y%m%d_%H%M%S')}" + + if ctx: + await ctx.info(f"Starting btmon capture to {output_path}") + + try: + # Try without sudo first + process = await asyncio.create_subprocess_exec( + *cmd, + stdout=asyncio.subprocess.DEVNULL, + stderr=asyncio.subprocess.PIPE, + ) + + # Wait briefly to see if it fails immediately + await asyncio.sleep(0.5) + + if process.returncode is not None: + # Process exited - likely permission error + stderr = await process.stderr.read() if process.stderr else b"" + error_msg = stderr.decode().strip() + + if "Operation not permitted" in error_msg: + return { + "success": False, + "error": "Permission denied. btmon needs elevated privileges.", + "solutions": [ + "Run: sudo setcap cap_net_raw+ep /usr/bin/btmon", + "Or add to sudoers: %wheel ALL=(ALL) NOPASSWD: /usr/bin/btmon", + "Or run capture manually: sudo btmon -w " + str(output_path), + ], + } + return {"success": False, "error": error_msg} + + # Success - store process + _running_captures[capture_id] = process + + return { + "success": True, + "capture_id": capture_id, + "output_file": str(output_path), + "message": f"Capture started. Use bt_capture_stop('{capture_id}') to stop.", + } + + except Exception as e: + return {"success": False, "error": str(e)} + + @mcp.tool() + async def bt_capture_stop( + capture_id: str, + ctx: Context | None = None, + ) -> dict: + """Stop a running Bluetooth capture. + + Args: + capture_id: The capture ID returned by bt_capture_start + + Returns: + Status dict with capture file info + """ + if capture_id not in _running_captures: + return { + "success": False, + "error": f"No capture found with ID: {capture_id}", + "active_captures": list(_running_captures.keys()), + } + + process = _running_captures[capture_id] + + if ctx: + await ctx.info(f"Stopping capture {capture_id}") + + try: + process.terminate() + await asyncio.wait_for(process.wait(), timeout=5.0) + except TimeoutError: + process.kill() + await process.wait() + + del _running_captures[capture_id] + + return { + "success": True, + "capture_id": capture_id, + "message": "Capture stopped successfully", + } + + @mcp.tool() + async def bt_capture_list_active() -> dict: + """List all active Bluetooth captures. + + Returns: + Dict with list of active capture IDs + """ + return { + "active_captures": list(_running_captures.keys()), + "count": len(_running_captures), + } + + @mcp.tool() + async def bt_capture_parse( + filepath: str, + max_packets: int = 100, + packet_type_filter: str | None = None, + direction_filter: Literal["TX", "RX"] | None = None, + ctx: Context | None = None, + ) -> dict: + """Parse a btsnoop capture file and return packet summaries. + + Args: + filepath: Path to the btsnoop capture file + max_packets: Maximum packets to return (default 100, 0 for all) + packet_type_filter: Filter by packet type (HCI_CMD, ACL_DATA, HCI_EVENT, etc.) + direction_filter: Filter by direction (TX or RX) + + Returns: + Dict with parsed packets and statistics + """ + path = Path(filepath).expanduser().resolve() + + if ctx: + await ctx.info(f"Parsing btsnoop file: {path}") + + try: + packets = parse_btsnoop_file(str(path), max_packets=0) # Parse all first + except FileNotFoundError: + return {"success": False, "error": f"File not found: {path}"} + except ValueError as e: + return {"success": False, "error": str(e)} + + # Apply filters + if packet_type_filter: + packets = [p for p in packets if p.packet_type == packet_type_filter] + if direction_filter: + packets = [p for p in packets if p.direction == direction_filter] + + # Limit output + total_count = len(packets) + if max_packets > 0: + packets = packets[:max_packets] + + # Build statistics + stats = { + "total_packets": total_count, + "returned_packets": len(packets), + "by_type": {}, + "by_direction": {"TX": 0, "RX": 0}, + } + + for p in packets: + stats["by_type"][p.packet_type] = stats["by_type"].get(p.packet_type, 0) + 1 + stats["by_direction"][p.direction] += 1 + + # Format packets for output + packet_list = [ + { + "index": p.index, + "timestamp": p.timestamp.isoformat(), + "direction": p.direction, + "type": p.packet_type, + "length": p.included_length, + "hex": p.hex_dump(32), + } + for p in packets + ] + + return { + "success": True, + "filepath": str(path), + "statistics": stats, + "packets": packet_list, + } + + @mcp.tool() + async def bt_capture_analyze( + filepath: str, + ctx: Context | None = None, + ) -> dict: + """Analyze a btsnoop capture file using btmon's analysis feature. + + Provides high-level statistics about the capture including + packet counts, timing analysis, and protocol distribution. + + Args: + filepath: Path to the btsnoop capture file + + Returns: + Dict with analysis results + """ + path = Path(filepath).expanduser().resolve() + + if not path.exists(): + return {"success": False, "error": f"File not found: {path}"} + + btmon_path = shutil.which("btmon") + if not btmon_path: + return {"success": False, "error": "btmon not found"} + + if ctx: + await ctx.info(f"Analyzing capture: {path}") + + try: + process = await asyncio.create_subprocess_exec( + "btmon", + "-a", + str(path), + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) + + stdout, stderr = await asyncio.wait_for(process.communicate(), timeout=30.0) + + if process.returncode != 0: + return { + "success": False, + "error": stderr.decode().strip() or "Analysis failed", + } + + return { + "success": True, + "filepath": str(path), + "analysis": stdout.decode(), + } + + except TimeoutError: + return {"success": False, "error": "Analysis timed out"} + except Exception as e: + return {"success": False, "error": str(e)} + + @mcp.tool() + async def bt_capture_read_raw( + filepath: str, + offset: int = 0, + count: int = 50, + ctx: Context | None = None, + ) -> dict: + """Read raw packet data from a btsnoop file using btmon. + + Displays human-readable packet decoding from btmon. + + Args: + filepath: Path to the btsnoop capture file + offset: Number of packets to skip from the beginning + count: Number of packets to display + + Returns: + Dict with decoded packet output + """ + path = Path(filepath).expanduser().resolve() + + if not path.exists(): + return {"success": False, "error": f"File not found: {path}"} + + btmon_path = shutil.which("btmon") + if not btmon_path: + return {"success": False, "error": "btmon not found"} + + if ctx: + await ctx.info(f"Reading capture: {path}") + + try: + # btmon -r reads and decodes the file + process = await asyncio.create_subprocess_exec( + "btmon", + "-r", + str(path), + "-T", # Show timestamps + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) + + stdout, stderr = await asyncio.wait_for(process.communicate(), timeout=30.0) + + output = stdout.decode() + + # Split into lines and apply offset/count + lines = output.split("\n") + + # Find packet boundaries (lines starting with timing info) + # This is a simple heuristic - btmon output has specific format + if offset > 0 or count > 0: + # Just return subset of lines for now + # A more sophisticated approach would parse packet boundaries + total_lines = len(lines) + start = min(offset * 10, total_lines) # Rough estimate: 10 lines per packet + end = min(start + count * 10, total_lines) + lines = lines[start:end] + output = "\n".join(lines) + + return { + "success": True, + "filepath": str(path), + "output": output, + "note": "Use bt_capture_parse for structured packet data", + } + + except TimeoutError: + return {"success": False, "error": "Read timed out"} + except Exception as e: + return {"success": False, "error": str(e)}