Add HFP Audio Gateway profile for E2E headset testing

Implements the AG (phone) role via BlueZ ProfileManager1, allowing
Linux to simulate a phone for testing HF devices like our ESP32 harness.

Core module (hfp_ag.py):
- Profile1 D-Bus service with RFCOMM socket handling
- Full AT command protocol: SLC negotiation (BRSF, CIND, CMER, CHLD),
  call control, codec negotiation (CVSD/mSBC), volume, voice recognition
- AG-initiated actions: simulate incoming calls, end calls, set indicators

MCP tools (8): enable/disable, status, simulate_call, end_call,
set_volume, set_signal, set_battery

Avoids `from __future__ import annotations` which breaks dbus-fast's
@method() decorator annotation parsing.
This commit is contained in:
Ryan Malloy 2026-02-03 15:14:54 -07:00
parent 31b911febd
commit 1bfb7b57ee
3 changed files with 981 additions and 1 deletions

767
src/mcbluetooth/hfp_ag.py Normal file
View File

@ -0,0 +1,767 @@
"""HFP Audio Gateway implementation for BlueZ.
Registers as an HFP AG (phone role) via BlueZ ProfileManager1, then speaks
the HFP AT command protocol over the RFCOMM socket handed to us by BlueZ
when an HF device (headset) connects.
This allows Linux to simulate a phone for E2E testing of Bluetooth headsets
(like our ESP32 test harness acting as a Hands-Free Unit).
HFP AT command flow (simplified):
1. HF connects BlueZ calls NewConnection with RFCOMM fd
2. SLC negotiation: BRSF features, CIND indicators, CMER enable
3. AG can then simulate: RING (incoming call), +CLIP (caller ID)
4. HF responds: ATA (answer), AT+CHUP (hangup), AT+VGS (volume)
"""
import asyncio
import logging
import os
import socket
from dataclasses import dataclass, field
from enum import Enum, IntFlag
from typing import Any
from dbus_fast import BusType, Variant
from dbus_fast.aio import MessageBus
from dbus_fast.service import ServiceInterface, method
log = logging.getLogger(__name__)
# D-Bus constants
BLUEZ_SERVICE = "org.bluez"
PROFILE_MANAGER_IFACE = "org.bluez.ProfileManager1"
HFP_AG_UUID = "0000111f-0000-1000-8000-00805f9b34fb"
HFP_AG_PROFILE_PATH = "/mcbluetooth/hfp_ag"
# HFP 1.7 AG feature flags
HFP_AG_FEATURES = (
(1 << 0) # Three-way calling
| (1 << 1) # EC/NR function
| (1 << 2) # Voice recognition
| (1 << 3) # In-band ring tone
| (1 << 4) # Voice tag
| (1 << 5) # Reject call
| (1 << 6) # Enhanced call status
| (1 << 7) # Enhanced call control
| (1 << 8) # Extended error result codes
| (1 << 9) # Codec negotiation
)
class CallState(Enum):
IDLE = "idle"
INCOMING = "incoming" # ringing
OUTGOING = "outgoing" # dialing
ALERTING = "alerting" # remote ringing
ACTIVE = "active" # in call
HELD = "held"
class CallDirection(Enum):
OUTGOING = 0
INCOMING = 1
@dataclass
class CallInfo:
index: int
direction: CallDirection
state: CallState
number: str = ""
number_type: int = 129 # 129=unknown, 145=international
@dataclass
class HFPConnection:
"""State for a single HFP connection to an HF device."""
device_path: str
address: str
fd: int
sock: socket.socket | None = None
reader: asyncio.StreamReader | None = None
writer: asyncio.StreamWriter | None = None
slc_established: bool = False
codec: str = "cvsd" # cvsd or msbc
hf_features: int = 0
speaker_volume: int = 7
mic_volume: int = 7
calls: list[CallInfo] = field(default_factory=list)
_read_task: asyncio.Task | None = field(default=None, repr=False)
_ring_task: asyncio.Task | None = field(default=None, repr=False)
# HFP indicator definitions (order matters — index referenced by +CIEV)
# (name, range, initial_value)
INDICATORS = [
("service", (0, 1), 1), # 1: network available
("call", (0, 1), 0), # 2: active call
("callsetup", (0, 3), 0), # 3: 0=none, 1=incoming, 2=outgoing, 3=alerting
("callheld", (0, 2), 0), # 4: 0=none, 1=held+active, 2=held only
("signal", (0, 5), 5), # 5: signal strength
("roam", (0, 1), 0), # 6: roaming
("battchg", (0, 5), 5), # 7: battery level
]
def _path_to_address(device_path: str) -> str:
parts = device_path.split("/")
if len(parts) >= 5 and parts[-1].startswith("dev_"):
return parts[-1][4:].replace("_", ":")
return device_path
class HFPAudioGatewayProfile(ServiceInterface):
"""D-Bus Profile1 service for HFP Audio Gateway role."""
def __init__(self) -> None:
super().__init__("org.bluez.Profile1")
self.connections: dict[str, HFPConnection] = {} # keyed by device_path
self.indicator_values = [ind[2] for ind in INDICATORS]
self._event_callbacks: list[Any] = []
def on_event(self, callback):
"""Register a callback for HFP events: callback(event_type, data)."""
self._event_callbacks.append(callback)
def _emit(self, event_type: str, data: dict[str, Any]) -> None:
for cb in self._event_callbacks:
try:
cb(event_type, data)
except Exception:
pass
@method()
def Release(self) -> None:
log.info("HFP AG profile released")
for conn in list(self.connections.values()):
self._cleanup_connection(conn)
self.connections.clear()
@method()
def NewConnection(self, device: "o", fd: "h", properties: "a{sv}") -> None:
address = _path_to_address(device)
log.info("HFP AG: new connection from %s (fd=%d)", address, fd)
# Duplicate the fd so we own it
new_fd = os.dup(fd)
conn = HFPConnection(
device_path=device,
address=address,
fd=new_fd,
)
self.connections[device] = conn
# Start async AT command handler
loop = asyncio.get_event_loop()
conn._read_task = loop.create_task(self._handle_connection(conn))
self._emit("hfp_ag_connect", {"address": address})
@method()
def RequestDisconnection(self, device: "o") -> None:
address = _path_to_address(device)
log.info("HFP AG: disconnect requested for %s", address)
conn = self.connections.pop(device, None)
if conn:
self._cleanup_connection(conn)
self._emit("hfp_ag_disconnect", {"address": address})
def _cleanup_connection(self, conn: HFPConnection) -> None:
if conn._ring_task and not conn._ring_task.done():
conn._ring_task.cancel()
if conn._read_task and not conn._read_task.done():
conn._read_task.cancel()
if conn.writer:
try:
conn.writer.close()
except Exception:
pass
if conn.sock:
try:
conn.sock.close()
except Exception:
pass
else:
try:
os.close(conn.fd)
except Exception:
pass
# ==================== AT Command Protocol ====================
async def _handle_connection(self, conn: HFPConnection) -> None:
"""Read loop for AT commands from the HF device."""
try:
conn.sock = socket.fromfd(conn.fd, socket.AF_BLUETOOTH, socket.SOCK_STREAM)
conn.sock.setblocking(False)
loop = asyncio.get_event_loop()
conn.reader, conn.writer = await asyncio.open_connection(sock=conn.sock)
buf = b""
while True:
data = await conn.reader.read(1024)
if not data:
break
buf += data
while b"\r" in buf:
line, buf = buf.split(b"\r", 1)
# Strip leading \n if present
line = line.lstrip(b"\n").strip()
if line:
await self._process_at_command(conn, line.decode("utf-8", errors="replace"))
except (ConnectionResetError, BrokenPipeError, OSError):
log.debug("HFP AG: connection closed for %s", conn.address)
except asyncio.CancelledError:
pass
except Exception:
log.exception("HFP AG: error handling connection for %s", conn.address)
finally:
self.connections.pop(conn.device_path, None)
self._cleanup_connection(conn)
self._emit("hfp_ag_disconnect", {"address": conn.address})
async def _send(self, conn: HFPConnection, response: str) -> None:
"""Send an AT response to the HF device."""
if conn.writer and not conn.writer.is_closing():
conn.writer.write(f"\r\n{response}\r\n".encode())
await conn.writer.drain()
log.debug("HFP AG → %s: %s", conn.address, response)
async def _send_ok(self, conn: HFPConnection) -> None:
await self._send(conn, "OK")
async def _send_error(self, conn: HFPConnection) -> None:
await self._send(conn, "ERROR")
async def _process_at_command(self, conn: HFPConnection, line: str) -> None:
"""Parse and handle an AT command from the HF."""
log.debug("HFP AG ← %s: %s", conn.address, line)
cmd = line.strip().upper()
self._emit("hfp_ag_at_recv", {"address": conn.address, "command": line.strip()})
# SLC establishment commands
if cmd.startswith("AT+BRSF="):
await self._handle_brsf(conn, line)
elif cmd == "AT+CIND=?":
await self._handle_cind_test(conn)
elif cmd == "AT+CIND?":
await self._handle_cind_read(conn)
elif cmd.startswith("AT+CMER="):
await self._handle_cmer(conn, line)
elif cmd.startswith("AT+CHLD=?"):
await self._handle_chld_test(conn)
# Codec negotiation
elif cmd.startswith("AT+BAC="):
await self._handle_bac(conn, line)
elif cmd.startswith("AT+BCS="):
await self._handle_bcs(conn, line)
# Call control
elif cmd == "ATA":
await self._handle_answer(conn)
elif cmd == "AT+CHUP":
await self._handle_hangup(conn)
elif cmd.startswith("ATD"):
await self._handle_dial(conn, line)
elif cmd.startswith("AT+DTMF="):
await self._handle_dtmf(conn, line)
# Volume
elif cmd.startswith("AT+VGS="):
await self._handle_vgs(conn, line)
elif cmd.startswith("AT+VGM="):
await self._handle_vgm(conn, line)
# Status queries
elif cmd.startswith("AT+CLCC"):
await self._handle_clcc(conn)
elif cmd.startswith("AT+COPS"):
await self._handle_cops(conn, line)
elif cmd.startswith("AT+CNUM"):
await self._handle_cnum(conn)
# Voice recognition
elif cmd.startswith("AT+BVRA="):
await self._handle_bvra(conn, line)
# Misc
elif cmd == "AT+CMEE=1":
await self._send_ok(conn)
elif cmd.startswith("AT+NREC="):
await self._send_ok(conn) # Noise reduction - acknowledge
elif cmd.startswith("AT+BTRH"):
await self._send_ok(conn)
elif cmd.startswith("AT+BIND"):
await self._handle_bind(conn, line)
elif cmd.startswith("AT+XAPL="):
await self._handle_xapl(conn, line)
elif cmd == "AT":
await self._send_ok(conn)
else:
log.warning("HFP AG: unknown command from %s: %s", conn.address, line)
await self._send_error(conn)
# ---- SLC negotiation handlers ----
async def _handle_brsf(self, conn: HFPConnection, line: str) -> None:
"""AT+BRSF=<features> — exchange supported features."""
try:
conn.hf_features = int(line.split("=")[1].strip())
except (IndexError, ValueError):
conn.hf_features = 0
await self._send(conn, f"+BRSF: {HFP_AG_FEATURES}")
await self._send_ok(conn)
async def _handle_cind_test(self, conn: HFPConnection) -> None:
"""AT+CIND=? — report indicator mapping."""
parts = []
for name, (lo, hi), _ in INDICATORS:
parts.append(f'("{name}",({lo},{hi}))')
await self._send(conn, "+CIND: " + ",".join(parts))
await self._send_ok(conn)
async def _handle_cind_read(self, conn: HFPConnection) -> None:
"""AT+CIND? — report current indicator values."""
vals = ",".join(str(v) for v in self.indicator_values)
await self._send(conn, f"+CIND: {vals}")
await self._send_ok(conn)
async def _handle_cmer(self, conn: HFPConnection, line: str) -> None:
"""AT+CMER=3,0,0,1 — enable indicator status reporting. SLC done."""
conn.slc_established = True
await self._send_ok(conn)
log.info("HFP AG: SLC established with %s", conn.address)
self._emit("hfp_ag_slc_established", {
"address": conn.address,
"hf_features": conn.hf_features,
})
async def _handle_chld_test(self, conn: HFPConnection) -> None:
"""AT+CHLD=? — report call hold/multiparty capabilities."""
await self._send(conn, "+CHLD: (0,1,2,3)")
await self._send_ok(conn)
async def _handle_bac(self, conn: HFPConnection, line: str) -> None:
"""AT+BAC=<codec_ids> — available codecs from HF."""
# 1=CVSD, 2=mSBC
await self._send_ok(conn)
# If WBS supported, select mSBC (codec 2)
if "2" in line:
await self._send(conn, "+BCS: 2")
async def _handle_bcs(self, conn: HFPConnection, line: str) -> None:
"""AT+BCS=<codec_id> — codec confirmation from HF."""
try:
codec_id = int(line.split("=")[1].strip())
conn.codec = "msbc" if codec_id == 2 else "cvsd"
except (IndexError, ValueError):
conn.codec = "cvsd"
await self._send_ok(conn)
log.info("HFP AG: codec selected: %s with %s", conn.codec, conn.address)
# ---- Call control handlers ----
async def _handle_answer(self, conn: HFPConnection) -> None:
"""ATA — HF answers incoming call."""
incoming = [c for c in conn.calls if c.state == CallState.INCOMING]
if incoming:
call = incoming[0]
call.state = CallState.ACTIVE
await self._send_ok(conn)
# Update indicators: call=1, callsetup=0
await self._update_indicator(conn, 2, 1) # call active
await self._update_indicator(conn, 3, 0) # callsetup none
self._emit("hfp_ag_call_answered", {
"address": conn.address,
"number": call.number,
})
# Stop ring task
if conn._ring_task and not conn._ring_task.done():
conn._ring_task.cancel()
conn._ring_task = None
else:
await self._send_error(conn)
async def _handle_hangup(self, conn: HFPConnection) -> None:
"""AT+CHUP — HF hangs up / rejects call."""
active = [c for c in conn.calls if c.state in (CallState.ACTIVE, CallState.INCOMING, CallState.OUTGOING, CallState.ALERTING)]
if active:
call = active[0]
old_state = call.state
conn.calls.remove(call)
await self._send_ok(conn)
# Update indicators
if old_state == CallState.ACTIVE:
await self._update_indicator(conn, 2, 0) # call inactive
elif old_state == CallState.INCOMING:
await self._update_indicator(conn, 3, 0) # callsetup none
self._emit("hfp_ag_call_ended", {
"address": conn.address,
"number": call.number,
"reason": "hangup",
})
# Stop ring task
if conn._ring_task and not conn._ring_task.done():
conn._ring_task.cancel()
conn._ring_task = None
else:
await self._send_ok(conn)
async def _handle_dial(self, conn: HFPConnection, line: str) -> None:
"""ATD<number>; — HF initiates outgoing call."""
number = line[3:].rstrip(";").strip()
call = CallInfo(
index=len(conn.calls) + 1,
direction=CallDirection.OUTGOING,
state=CallState.OUTGOING,
number=number,
)
conn.calls.append(call)
await self._send_ok(conn)
await self._update_indicator(conn, 3, 2) # callsetup=outgoing
self._emit("hfp_ag_outgoing_call", {
"address": conn.address,
"number": number,
})
async def _handle_dtmf(self, conn: HFPConnection, line: str) -> None:
"""AT+DTMF=<code> — HF sends DTMF tone."""
try:
code = line.split("=")[1].strip()
except IndexError:
code = ""
await self._send_ok(conn)
self._emit("hfp_ag_dtmf", {"address": conn.address, "code": code})
# ---- Volume handlers ----
async def _handle_vgs(self, conn: HFPConnection, line: str) -> None:
"""AT+VGS=<level> — HF reports speaker volume."""
try:
conn.speaker_volume = int(line.split("=")[1].strip())
except (IndexError, ValueError):
pass
await self._send_ok(conn)
self._emit("hfp_ag_volume", {
"address": conn.address,
"type": "speaker",
"level": conn.speaker_volume,
})
async def _handle_vgm(self, conn: HFPConnection, line: str) -> None:
"""AT+VGM=<level> — HF reports microphone volume."""
try:
conn.mic_volume = int(line.split("=")[1].strip())
except (IndexError, ValueError):
pass
await self._send_ok(conn)
self._emit("hfp_ag_volume", {
"address": conn.address,
"type": "microphone",
"level": conn.mic_volume,
})
# ---- Status queries ----
async def _handle_clcc(self, conn: HFPConnection) -> None:
"""AT+CLCC — list current calls."""
for call in conn.calls:
state_map = {
CallState.ACTIVE: 0,
CallState.HELD: 1,
CallState.OUTGOING: 2,
CallState.ALERTING: 3,
CallState.INCOMING: 4,
}
stat = state_map.get(call.state, 0)
await self._send(
conn,
f"+CLCC: {call.index},{call.direction.value},{stat},0,0"
+ (f',"{call.number}",{call.number_type}' if call.number else ""),
)
await self._send_ok(conn)
async def _handle_cops(self, conn: HFPConnection, line: str) -> None:
"""AT+COPS — operator name query."""
if "?" in line:
await self._send(conn, '+COPS: 0,0,"mcbluetooth"')
await self._send_ok(conn)
async def _handle_cnum(self, conn: HFPConnection) -> None:
"""AT+CNUM — subscriber number."""
await self._send(conn, '+CNUM: ,"5551234567",129,,4')
await self._send_ok(conn)
async def _handle_bvra(self, conn: HFPConnection, line: str) -> None:
"""AT+BVRA=<state> — voice recognition."""
try:
state = int(line.split("=")[1].strip())
except (IndexError, ValueError):
state = 0
await self._send_ok(conn)
self._emit("hfp_ag_voice_recognition", {
"address": conn.address,
"active": bool(state),
})
async def _handle_bind(self, conn: HFPConnection, line: str) -> None:
"""AT+BIND — HF indicators."""
if "=?" in line:
await self._send(conn, "+BIND: (1,2)") # enhanced safety, battery
await self._send_ok(conn)
elif "?" in line:
await self._send(conn, "+BIND: 1,1")
await self._send(conn, "+BIND: 2,1")
await self._send_ok(conn)
elif "=" in line:
await self._send_ok(conn)
else:
await self._send_ok(conn)
async def _handle_xapl(self, conn: HFPConnection, line: str) -> None:
"""AT+XAPL= — Apple-specific extension. Acknowledge."""
await self._send(conn, "+XAPL=iPhone,7")
await self._send_ok(conn)
# ==================== AG-initiated actions ====================
async def _update_indicator(self, conn: HFPConnection, index: int, value: int) -> None:
"""Send +CIEV indicator update to HF (1-based index)."""
if 1 <= index <= len(self.indicator_values):
self.indicator_values[index - 1] = value
if conn.slc_established:
await self._send(conn, f"+CIEV: {index},{value}")
async def simulate_incoming_call(
self, address: str, number: str = "5551234567", number_type: int = 129
) -> bool:
"""Simulate an incoming call from the AG to the HF device.
Returns True if the call was initiated successfully.
"""
conn = self._get_connection(address)
if not conn or not conn.slc_established:
return False
call = CallInfo(
index=len(conn.calls) + 1,
direction=CallDirection.INCOMING,
state=CallState.INCOMING,
number=number,
number_type=number_type,
)
conn.calls.append(call)
# Update callsetup indicator: 1 = incoming
await self._update_indicator(conn, 3, 1)
# Start ringing
async def ring_loop():
try:
while call.state == CallState.INCOMING:
await self._send(conn, "RING")
await self._send(conn, f'+CLIP: "{number}",{number_type}')
await asyncio.sleep(3.0)
except asyncio.CancelledError:
pass
except Exception:
pass
conn._ring_task = asyncio.get_event_loop().create_task(ring_loop())
self._emit("hfp_ag_incoming_call", {
"address": address,
"number": number,
})
return True
async def simulate_call_end(self, address: str) -> bool:
"""End any active or ringing call from AG side."""
conn = self._get_connection(address)
if not conn:
return False
active = [c for c in conn.calls if c.state in (CallState.ACTIVE, CallState.INCOMING)]
if not active:
return False
call = active[0]
was_active = call.state == CallState.ACTIVE
conn.calls.remove(call)
if conn._ring_task and not conn._ring_task.done():
conn._ring_task.cancel()
conn._ring_task = None
if was_active:
await self._update_indicator(conn, 2, 0) # call inactive
await self._update_indicator(conn, 3, 0) # callsetup none
self._emit("hfp_ag_call_ended", {
"address": address,
"number": call.number,
"reason": "ag_hangup",
})
return True
async def set_signal_strength(self, address: str, level: int) -> bool:
"""Update signal strength indicator (0-5)."""
conn = self._get_connection(address)
if not conn:
return False
level = max(0, min(5, level))
await self._update_indicator(conn, 5, level)
return True
async def set_battery_level(self, address: str, level: int) -> bool:
"""Update battery level indicator (0-5)."""
conn = self._get_connection(address)
if not conn:
return False
level = max(0, min(5, level))
await self._update_indicator(conn, 7, level)
return True
async def set_speaker_volume(self, address: str, level: int) -> bool:
"""Send volume change to HF device (0-15)."""
conn = self._get_connection(address)
if not conn or not conn.slc_established:
return False
level = max(0, min(15, level))
conn.speaker_volume = level
await self._send(conn, f"+VGS: {level}")
return True
async def set_mic_volume(self, address: str, level: int) -> bool:
"""Send mic volume change to HF device (0-15)."""
conn = self._get_connection(address)
if not conn or not conn.slc_established:
return False
level = max(0, min(15, level))
conn.mic_volume = level
await self._send(conn, f"+VGM: {level}")
return True
def get_connection(self, address: str) -> HFPConnection | None:
"""Get connection by address (public API)."""
return self._get_connection(address)
def _get_connection(self, address: str) -> HFPConnection | None:
for conn in self.connections.values():
if conn.address.upper() == address.upper():
return conn
return None
def get_status(self) -> dict[str, Any]:
"""Get overall HFP AG status."""
conns = []
for conn in self.connections.values():
calls = []
for call in conn.calls:
calls.append({
"index": call.index,
"direction": call.direction.name.lower(),
"state": call.state.value,
"number": call.number,
})
conns.append({
"address": conn.address,
"slc_established": conn.slc_established,
"codec": conn.codec,
"speaker_volume": conn.speaker_volume,
"mic_volume": conn.mic_volume,
"calls": calls,
})
return {
"registered": _profile_registered,
"connections": conns,
"indicators": {
ind[0]: self.indicator_values[i]
for i, ind in enumerate(INDICATORS)
},
}
# ==================== Module-level lifecycle ====================
_profile: HFPAudioGatewayProfile | None = None
_profile_bus: MessageBus | None = None
_profile_registered: bool = False
async def enable_hfp_ag() -> HFPAudioGatewayProfile:
"""Register the HFP AG profile with BlueZ."""
global _profile, _profile_bus, _profile_registered
if _profile_registered and _profile:
return _profile
if _profile is None:
_profile = HFPAudioGatewayProfile()
if _profile_bus is None:
_profile_bus = await MessageBus(bus_type=BusType.SYSTEM).connect()
_profile_bus.export(HFP_AG_PROFILE_PATH, _profile)
# Register with ProfileManager1
introspection = await _profile_bus.introspect(BLUEZ_SERVICE, "/org/bluez")
proxy = _profile_bus.get_proxy_object(BLUEZ_SERVICE, "/org/bluez", introspection)
profile_mgr = proxy.get_interface(PROFILE_MANAGER_IFACE)
options = {
"Name": Variant("s", "mcbluetooth HFP AG"),
"Role": Variant("s", "server"),
"Channel": Variant("q", 13), # RFCOMM channel for HFP
"Features": Variant("q", HFP_AG_FEATURES & 0xFFFF),
"Version": Variant("q", 0x0107), # HFP 1.7
}
try:
await profile_mgr.call_register_profile(
HFP_AG_PROFILE_PATH,
HFP_AG_UUID,
options,
)
_profile_registered = True
log.info("HFP AG profile registered with BlueZ")
except Exception as e:
if "Already Exists" in str(e):
_profile_registered = True
log.info("HFP AG profile was already registered")
else:
raise
return _profile
async def disable_hfp_ag() -> None:
"""Unregister the HFP AG profile."""
global _profile, _profile_bus, _profile_registered
if _profile_bus and _profile_registered:
try:
introspection = await _profile_bus.introspect(BLUEZ_SERVICE, "/org/bluez")
proxy = _profile_bus.get_proxy_object(BLUEZ_SERVICE, "/org/bluez", introspection)
profile_mgr = proxy.get_interface(PROFILE_MANAGER_IFACE)
await profile_mgr.call_unregister_profile(HFP_AG_PROFILE_PATH)
except Exception:
pass
_profile_registered = False
if _profile:
_profile.Release()
_profile = None
if _profile_bus:
_profile_bus.disconnect()
_profile_bus = None
async def get_hfp_ag() -> HFPAudioGatewayProfile | None:
"""Get the current HFP AG profile instance (None if not enabled)."""
return _profile

View File

@ -3,7 +3,7 @@
from fastmcp import FastMCP from fastmcp import FastMCP
from mcbluetooth import resources from mcbluetooth import resources
from mcbluetooth.tools import adapter, audio, ble, device, monitor, obex from mcbluetooth.tools import adapter, audio, ble, device, hfp, monitor, obex
mcp = FastMCP( mcp = FastMCP(
name="mcbluetooth", name="mcbluetooth",
@ -43,6 +43,7 @@ resources.register_resources(mcp)
adapter.register_tools(mcp) adapter.register_tools(mcp)
device.register_tools(mcp) device.register_tools(mcp)
audio.register_tools(mcp) audio.register_tools(mcp)
hfp.register_tools(mcp)
ble.register_tools(mcp) ble.register_tools(mcp)
monitor.register_tools(mcp) monitor.register_tools(mcp)
obex.register_tools(mcp) obex.register_tools(mcp)

View File

@ -0,0 +1,212 @@
"""HFP Audio Gateway tools for Bluetooth MCP server.
These tools let Linux act as a phone (Audio Gateway) for testing Bluetooth
headsets and hands-free devices. The ESP32 test harness connects as the
Hands-Free Unit (headset), and these tools simulate call control from the
phone side.
Typical E2E test flow:
1. bt_hfp_ag_enable() Register AG profile with BlueZ
2. ESP32 connects as HF SLC auto-negotiated
3. bt_hfp_ag_simulate_call() Send RING + CLIP to ESP32
4. ESP32 answers (ATA) Call becomes active
5. bt_hfp_ag_end_call() Terminate call
"""
from __future__ import annotations
from typing import Any, Literal
from fastmcp import FastMCP
from mcbluetooth.hfp_ag import (
disable_hfp_ag,
enable_hfp_ag,
get_hfp_ag,
)
def register_tools(mcp: FastMCP) -> None:
"""Register HFP AG tools with the MCP server."""
@mcp.tool()
async def bt_hfp_ag_enable() -> dict[str, Any]:
"""Enable HFP Audio Gateway mode on Linux.
Registers a custom HFP AG profile with BlueZ via ProfileManager1.
After enabling, Bluetooth headsets (HF devices) can connect and
Linux will act as the phone side.
The SLC (Service Level Connection) is auto-negotiated when an HF
device connects feature exchange, indicator setup, and codec
selection happen automatically.
Returns:
Status confirming AG profile registration.
"""
try:
await enable_hfp_ag()
return {"status": "ok", "role": "audio_gateway", "profile": "HFP AG 1.7"}
except Exception as exc:
return {"status": "error", "error": str(exc)}
@mcp.tool()
async def bt_hfp_ag_disable() -> dict[str, Any]:
"""Disable HFP Audio Gateway mode.
Unregisters the AG profile, disconnecting any active HFP sessions.
Returns:
Status confirming AG profile removal.
"""
try:
await disable_hfp_ag()
return {"status": "ok", "disabled": True}
except Exception as exc:
return {"status": "error", "error": str(exc)}
@mcp.tool()
async def bt_hfp_ag_status() -> dict[str, Any]:
"""Get HFP Audio Gateway status.
Returns:
- registered: Whether the AG profile is active
- connections: List of connected HF devices with SLC state,
codec, volumes, and active calls
- indicators: Current indicator values (service, call, signal, etc.)
"""
profile = await get_hfp_ag()
if not profile:
return {"status": "ok", "registered": False, "connections": []}
return {"status": "ok", **profile.get_status()}
@mcp.tool()
async def bt_hfp_ag_simulate_call(
address: str,
number: str = "5551234567",
) -> dict[str, Any]:
"""Simulate an incoming call to a connected HF device.
Sends RING and +CLIP (caller ID) notifications to the headset.
The HF device will see an incoming call and can answer (ATA) or
reject (AT+CHUP).
Ringing repeats every 3 seconds until answered or ended.
Args:
address: Bluetooth address of the connected HF device.
number: Caller phone number to display.
Returns:
Status of call initiation.
"""
profile = await get_hfp_ag()
if not profile:
return {"status": "error", "error": "HFP AG not enabled"}
ok = await profile.simulate_incoming_call(address, number=number)
if ok:
return {"status": "ok", "call_state": "ringing", "number": number}
return {
"status": "error",
"error": "No SLC connection to device (connect HF first)",
}
@mcp.tool()
async def bt_hfp_ag_end_call(address: str) -> dict[str, Any]:
"""End an active or ringing call from the AG side.
Terminates the current call and sends indicator updates to the
HF device.
Args:
address: Bluetooth address of the connected HF device.
Returns:
Status of call termination.
"""
profile = await get_hfp_ag()
if not profile:
return {"status": "error", "error": "HFP AG not enabled"}
ok = await profile.simulate_call_end(address)
if ok:
return {"status": "ok", "call_state": "ended"}
return {"status": "error", "error": "No active call to end"}
@mcp.tool()
async def bt_hfp_ag_set_volume(
address: str,
type: Literal["speaker", "microphone"],
level: int,
) -> dict[str, Any]:
"""Set speaker or microphone volume on the HF device.
Sends +VGS (speaker) or +VGM (mic) to change volume remotely.
Args:
address: Bluetooth address of the connected HF device.
type: "speaker" for output volume, "microphone" for input.
level: Volume level 0-15 (0 = muted, 15 = maximum).
Returns:
Status with applied volume level.
"""
profile = await get_hfp_ag()
if not profile:
return {"status": "error", "error": "HFP AG not enabled"}
if type == "speaker":
ok = await profile.set_speaker_volume(address, level)
else:
ok = await profile.set_mic_volume(address, level)
if ok:
return {"status": "ok", "type": type, "level": level}
return {"status": "error", "error": "No SLC connection to device"}
@mcp.tool()
async def bt_hfp_ag_set_signal(
address: str,
level: int,
) -> dict[str, Any]:
"""Update the signal strength indicator shown on the HF device.
Args:
address: Bluetooth address of the connected HF device.
level: Signal strength 0-5.
Returns:
Status with applied signal level.
"""
profile = await get_hfp_ag()
if not profile:
return {"status": "error", "error": "HFP AG not enabled"}
ok = await profile.set_signal_strength(address, level)
if ok:
return {"status": "ok", "signal_strength": level}
return {"status": "error", "error": "No SLC connection to device"}
@mcp.tool()
async def bt_hfp_ag_set_battery(
address: str,
level: int,
) -> dict[str, Any]:
"""Update the battery level indicator shown on the HF device.
Args:
address: Bluetooth address of the connected HF device.
level: Battery level 0-5.
Returns:
Status with applied battery level.
"""
profile = await get_hfp_ag()
if not profile:
return {"status": "error", "error": "HFP AG not enabled"}
ok = await profile.set_battery_level(address, level)
if ok:
return {"status": "ok", "battery_level": level}
return {"status": "error", "error": "No SLC connection to device"}