Add btmon integration for HCI traffic capture and analysis

New monitor tools:
- bt_capture_start: Start background HCI capture to btsnoop file
- bt_capture_stop: Stop running capture by ID
- bt_capture_list_active: List active captures
- bt_capture_parse: Parse btsnoop into structured packet data
- bt_capture_analyze: Run btmon analysis on capture file
- bt_capture_read_raw: Read decoded packets via btmon

Features:
- Native btsnoop file parsing (no btmon needed for parse)
- Filter by packet type (HCI_CMD, ACL_DATA, HCI_EVENT, etc.)
- Filter by direction (TX/RX)
- Statistics and hex dump output

Note: Live capture requires CAP_NET_RAW or sudo.
This commit is contained in:
Ryan Malloy 2026-02-02 11:45:02 -07:00
parent 3e3d77068b
commit cd03fa9253
4 changed files with 532 additions and 18 deletions

View File

@ -127,6 +127,18 @@ The server exposes dynamic resources for live state queries:
| `bt_ble_notify` | Enable/disable notifications |
| `bt_ble_battery` | Read battery level |
### Monitor Tools (btmon integration)
| Tool | Description |
|------|-------------|
| `bt_capture_start` | Start HCI traffic capture to btsnoop file |
| `bt_capture_stop` | Stop a running capture |
| `bt_capture_list_active` | List active captures |
| `bt_capture_parse` | Parse btsnoop file into structured packets |
| `bt_capture_analyze` | Analyze capture with btmon statistics |
| `bt_capture_read_raw` | Read human-readable decoded packets |
> **Note:** Live capture requires elevated privileges. Run `sudo setcap cap_net_raw+ep /usr/bin/btmon` to enable without sudo.
## Example Prompts
```
@ -146,21 +158,21 @@ The server exposes dynamic resources for live state queries:
## Architecture
```
┌─────────────────────────────────────────────┐
│ FastMCP Server │
├─────────────────────────────────────────────┤
│ Tool Categories │
│ ┌─────────┬─────────┬─────────┬─────────┐
│ │ Adapter │ Device │ Audio │ BLE │ │
│ │ Tools │ Tools │ Tools │ Tools │ │
│ └─────────┴─────────┴─────────┴─────────┘
├─────────────────────────────────────────────┤
BlueZ D-Bus Client Layer
(dbus-fast) │
├─────────────────────────────────────────────┤
│ PipeWire/PulseAudio Integration │
│ (pulsectl-asyncio) │
└─────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────
FastMCP Server
├─────────────────────────────────────────────────────
Tool Categories
│ ┌─────────┬─────────┬─────────┬─────────┬────────┐ │
│ │ Adapter │ Device │ Audio │ BLE │Monitor
│ │ Tools │ Tools │ Tools │ Tools │ Tools │
│ └─────────┴─────────┴─────────┴─────────┴────────┘ │
├─────────────────────────────────────────────────────
│ BlueZ D-Bus Client btmon (HCI capture)
│ (dbus-fast) (btsnoop format)
├─────────────────────────────────────────────────────
PipeWire/PulseAudio Integration
(pulsectl-asyncio)
└─────────────────────────────────────────────────────
```
## License

View File

@ -3,7 +3,7 @@
from fastmcp import FastMCP
from mcbluetooth import resources
from mcbluetooth.tools import adapter, audio, ble, device
from mcbluetooth.tools import adapter, audio, ble, device, monitor
mcp = FastMCP(
name="mcbluetooth",
@ -43,6 +43,7 @@ adapter.register_tools(mcp)
device.register_tools(mcp)
audio.register_tools(mcp)
ble.register_tools(mcp)
monitor.register_tools(mcp)
def main():

View File

@ -1,5 +1,5 @@
"""MCP tool modules for Bluetooth management."""
from mcbluetooth.tools import adapter, audio, ble, device
from mcbluetooth.tools import adapter, audio, ble, device, monitor
__all__ = ["adapter", "device", "audio", "ble"]
__all__ = ["adapter", "device", "audio", "ble", "monitor"]

View File

@ -0,0 +1,501 @@
"""Bluetooth traffic monitoring tools using btmon.
btmon is the BlueZ Bluetooth monitor that captures HCI traffic.
These tools provide MCP integration for:
- Starting/stopping packet captures
- Analyzing btsnoop capture files
- Parsing captured packets for protocol analysis
Note: Live capture requires elevated privileges (CAP_NET_RAW or sudo).
"""
from __future__ import annotations
import asyncio
import shutil
import struct
from dataclasses import dataclass
from datetime import datetime
from pathlib import Path
from typing import Literal
from fastmcp import Context, FastMCP
# btsnoop file format constants
BTSNOOP_MAGIC = b"btsnoop\x00"
BTSNOOP_VERSION = 1
BTSNOOP_DATALINK_HCI = 1002 # HCI UART (H4)
# HCI packet types
HCI_PACKET_TYPES = {
0x01: "HCI_CMD",
0x02: "ACL_DATA",
0x03: "SCO_DATA",
0x04: "HCI_EVENT",
0x05: "ISO_DATA",
}
# Track running captures
_running_captures: dict[str, asyncio.subprocess.Process] = {}
@dataclass
class BtsnoopPacket:
"""A single packet from a btsnoop capture."""
index: int
timestamp: datetime
direction: Literal["TX", "RX"]
packet_type: str
data: bytes
original_length: int
included_length: int
def hex_dump(self, max_bytes: int = 32) -> str:
"""Return hex representation of packet data."""
data = self.data[:max_bytes]
hex_str = data.hex()
# Format as pairs with spaces
pairs = [hex_str[i : i + 2] for i in range(0, len(hex_str), 2)]
result = " ".join(pairs)
if len(self.data) > max_bytes:
result += f" ... ({len(self.data)} bytes total)"
return result
def parse_btsnoop_file(filepath: str, max_packets: int = 100) -> list[BtsnoopPacket]:
"""Parse a btsnoop capture file and return packets.
Args:
filepath: Path to the btsnoop file
max_packets: Maximum number of packets to return (0 for all)
Returns:
List of BtsnoopPacket objects
"""
packets = []
path = Path(filepath)
if not path.exists():
raise FileNotFoundError(f"Capture file not found: {filepath}")
with open(path, "rb") as f:
# Read and validate header
magic = f.read(8)
if magic != BTSNOOP_MAGIC:
raise ValueError(f"Not a valid btsnoop file: {filepath}")
version, datalink = struct.unpack(">II", f.read(8))
if version != BTSNOOP_VERSION:
raise ValueError(f"Unsupported btsnoop version: {version}")
# Read packets
packet_index = 0
while True:
if max_packets > 0 and packet_index >= max_packets:
break
# Read packet header (24 bytes)
header = f.read(24)
if len(header) < 24:
break # EOF
(
original_length,
included_length,
packet_flags,
cumulative_drops,
timestamp_us,
) = struct.unpack(">IIIIQ", header)
# Read packet data
data = f.read(included_length)
if len(data) < included_length:
break # Truncated
# Parse flags
# Bit 0: 0=TX (host->controller), 1=RX (controller->host)
# Bit 1: 0=data, 1=command/event
direction = "RX" if (packet_flags & 0x01) else "TX"
# First byte is HCI packet type indicator (H4 protocol)
if data:
pkt_type_byte = data[0]
packet_type = HCI_PACKET_TYPES.get(pkt_type_byte, f"UNKNOWN_0x{pkt_type_byte:02x}")
else:
packet_type = "EMPTY"
# Convert timestamp (microseconds since epoch 2000-01-01)
# btsnoop uses a different epoch than Unix
btsnoop_epoch = datetime(2000, 1, 1)
timestamp = btsnoop_epoch.replace(
microsecond=0
) + __import__("datetime").timedelta(microseconds=timestamp_us)
packets.append(
BtsnoopPacket(
index=packet_index,
timestamp=timestamp,
direction=direction,
packet_type=packet_type,
data=data,
original_length=original_length,
included_length=included_length,
)
)
packet_index += 1
return packets
def register_tools(mcp: FastMCP) -> None:
"""Register btmon monitoring tools with the MCP server."""
@mcp.tool()
async def bt_capture_start(
output_file: str,
adapter: str | None = None,
include_sco: bool = False,
include_a2dp: bool = False,
include_iso: bool = False,
ctx: Context | None = None,
) -> dict:
"""Start capturing Bluetooth HCI traffic to a btsnoop file.
Requires elevated privileges (sudo or CAP_NET_RAW on btmon).
The capture runs in the background until bt_capture_stop is called.
Args:
output_file: Path to save the btsnoop capture file
adapter: Adapter index to capture (e.g., "0" for hci0), or None for all
include_sco: Include SCO (voice) traffic in capture
include_a2dp: Include A2DP (audio streaming) traffic in capture
include_iso: Include ISO (LE Audio) traffic in capture
Returns:
Status dict with capture_id if started successfully
"""
# Check if btmon exists
btmon_path = shutil.which("btmon")
if not btmon_path:
return {
"success": False,
"error": "btmon not found. Install bluez-utils package.",
}
# Expand path
output_path = Path(output_file).expanduser().resolve()
output_path.parent.mkdir(parents=True, exist_ok=True)
# Build command
cmd = ["btmon", "-w", str(output_path)]
if adapter is not None:
cmd.extend(["-i", str(adapter)])
if include_sco:
cmd.append("-S")
if include_a2dp:
cmd.append("-A")
if include_iso:
cmd.append("-I")
# Generate capture ID
capture_id = f"capture_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
if ctx:
await ctx.info(f"Starting btmon capture to {output_path}")
try:
# Try without sudo first
process = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.DEVNULL,
stderr=asyncio.subprocess.PIPE,
)
# Wait briefly to see if it fails immediately
await asyncio.sleep(0.5)
if process.returncode is not None:
# Process exited - likely permission error
stderr = await process.stderr.read() if process.stderr else b""
error_msg = stderr.decode().strip()
if "Operation not permitted" in error_msg:
return {
"success": False,
"error": "Permission denied. btmon needs elevated privileges.",
"solutions": [
"Run: sudo setcap cap_net_raw+ep /usr/bin/btmon",
"Or add to sudoers: %wheel ALL=(ALL) NOPASSWD: /usr/bin/btmon",
"Or run capture manually: sudo btmon -w " + str(output_path),
],
}
return {"success": False, "error": error_msg}
# Success - store process
_running_captures[capture_id] = process
return {
"success": True,
"capture_id": capture_id,
"output_file": str(output_path),
"message": f"Capture started. Use bt_capture_stop('{capture_id}') to stop.",
}
except Exception as e:
return {"success": False, "error": str(e)}
@mcp.tool()
async def bt_capture_stop(
capture_id: str,
ctx: Context | None = None,
) -> dict:
"""Stop a running Bluetooth capture.
Args:
capture_id: The capture ID returned by bt_capture_start
Returns:
Status dict with capture file info
"""
if capture_id not in _running_captures:
return {
"success": False,
"error": f"No capture found with ID: {capture_id}",
"active_captures": list(_running_captures.keys()),
}
process = _running_captures[capture_id]
if ctx:
await ctx.info(f"Stopping capture {capture_id}")
try:
process.terminate()
await asyncio.wait_for(process.wait(), timeout=5.0)
except TimeoutError:
process.kill()
await process.wait()
del _running_captures[capture_id]
return {
"success": True,
"capture_id": capture_id,
"message": "Capture stopped successfully",
}
@mcp.tool()
async def bt_capture_list_active() -> dict:
"""List all active Bluetooth captures.
Returns:
Dict with list of active capture IDs
"""
return {
"active_captures": list(_running_captures.keys()),
"count": len(_running_captures),
}
@mcp.tool()
async def bt_capture_parse(
filepath: str,
max_packets: int = 100,
packet_type_filter: str | None = None,
direction_filter: Literal["TX", "RX"] | None = None,
ctx: Context | None = None,
) -> dict:
"""Parse a btsnoop capture file and return packet summaries.
Args:
filepath: Path to the btsnoop capture file
max_packets: Maximum packets to return (default 100, 0 for all)
packet_type_filter: Filter by packet type (HCI_CMD, ACL_DATA, HCI_EVENT, etc.)
direction_filter: Filter by direction (TX or RX)
Returns:
Dict with parsed packets and statistics
"""
path = Path(filepath).expanduser().resolve()
if ctx:
await ctx.info(f"Parsing btsnoop file: {path}")
try:
packets = parse_btsnoop_file(str(path), max_packets=0) # Parse all first
except FileNotFoundError:
return {"success": False, "error": f"File not found: {path}"}
except ValueError as e:
return {"success": False, "error": str(e)}
# Apply filters
if packet_type_filter:
packets = [p for p in packets if p.packet_type == packet_type_filter]
if direction_filter:
packets = [p for p in packets if p.direction == direction_filter]
# Limit output
total_count = len(packets)
if max_packets > 0:
packets = packets[:max_packets]
# Build statistics
stats = {
"total_packets": total_count,
"returned_packets": len(packets),
"by_type": {},
"by_direction": {"TX": 0, "RX": 0},
}
for p in packets:
stats["by_type"][p.packet_type] = stats["by_type"].get(p.packet_type, 0) + 1
stats["by_direction"][p.direction] += 1
# Format packets for output
packet_list = [
{
"index": p.index,
"timestamp": p.timestamp.isoformat(),
"direction": p.direction,
"type": p.packet_type,
"length": p.included_length,
"hex": p.hex_dump(32),
}
for p in packets
]
return {
"success": True,
"filepath": str(path),
"statistics": stats,
"packets": packet_list,
}
@mcp.tool()
async def bt_capture_analyze(
filepath: str,
ctx: Context | None = None,
) -> dict:
"""Analyze a btsnoop capture file using btmon's analysis feature.
Provides high-level statistics about the capture including
packet counts, timing analysis, and protocol distribution.
Args:
filepath: Path to the btsnoop capture file
Returns:
Dict with analysis results
"""
path = Path(filepath).expanduser().resolve()
if not path.exists():
return {"success": False, "error": f"File not found: {path}"}
btmon_path = shutil.which("btmon")
if not btmon_path:
return {"success": False, "error": "btmon not found"}
if ctx:
await ctx.info(f"Analyzing capture: {path}")
try:
process = await asyncio.create_subprocess_exec(
"btmon",
"-a",
str(path),
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await asyncio.wait_for(process.communicate(), timeout=30.0)
if process.returncode != 0:
return {
"success": False,
"error": stderr.decode().strip() or "Analysis failed",
}
return {
"success": True,
"filepath": str(path),
"analysis": stdout.decode(),
}
except TimeoutError:
return {"success": False, "error": "Analysis timed out"}
except Exception as e:
return {"success": False, "error": str(e)}
@mcp.tool()
async def bt_capture_read_raw(
filepath: str,
offset: int = 0,
count: int = 50,
ctx: Context | None = None,
) -> dict:
"""Read raw packet data from a btsnoop file using btmon.
Displays human-readable packet decoding from btmon.
Args:
filepath: Path to the btsnoop capture file
offset: Number of packets to skip from the beginning
count: Number of packets to display
Returns:
Dict with decoded packet output
"""
path = Path(filepath).expanduser().resolve()
if not path.exists():
return {"success": False, "error": f"File not found: {path}"}
btmon_path = shutil.which("btmon")
if not btmon_path:
return {"success": False, "error": "btmon not found"}
if ctx:
await ctx.info(f"Reading capture: {path}")
try:
# btmon -r reads and decodes the file
process = await asyncio.create_subprocess_exec(
"btmon",
"-r",
str(path),
"-T", # Show timestamps
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await asyncio.wait_for(process.communicate(), timeout=30.0)
output = stdout.decode()
# Split into lines and apply offset/count
lines = output.split("\n")
# Find packet boundaries (lines starting with timing info)
# This is a simple heuristic - btmon output has specific format
if offset > 0 or count > 0:
# Just return subset of lines for now
# A more sophisticated approach would parse packet boundaries
total_lines = len(lines)
start = min(offset * 10, total_lines) # Rough estimate: 10 lines per packet
end = min(start + count * 10, total_lines)
lines = lines[start:end]
output = "\n".join(lines)
return {
"success": True,
"filepath": str(path),
"output": output,
"note": "Use bt_capture_parse for structured packet data",
}
except TimeoutError:
return {"success": False, "error": "Read timed out"}
except Exception as e:
return {"success": False, "error": str(e)}