diff --git a/src/mcbluetooth/hfp_ag.py b/src/mcbluetooth/hfp_ag.py new file mode 100644 index 0000000..5aa3bba --- /dev/null +++ b/src/mcbluetooth/hfp_ag.py @@ -0,0 +1,767 @@ +"""HFP Audio Gateway implementation for BlueZ. + +Registers as an HFP AG (phone role) via BlueZ ProfileManager1, then speaks +the HFP AT command protocol over the RFCOMM socket handed to us by BlueZ +when an HF device (headset) connects. + +This allows Linux to simulate a phone for E2E testing of Bluetooth headsets +(like our ESP32 test harness acting as a Hands-Free Unit). + +HFP AT command flow (simplified): + 1. HF connects → BlueZ calls NewConnection with RFCOMM fd + 2. SLC negotiation: BRSF features, CIND indicators, CMER enable + 3. AG can then simulate: RING (incoming call), +CLIP (caller ID) + 4. HF responds: ATA (answer), AT+CHUP (hangup), AT+VGS (volume) +""" + +import asyncio +import logging +import os +import socket +from dataclasses import dataclass, field +from enum import Enum, IntFlag +from typing import Any + +from dbus_fast import BusType, Variant +from dbus_fast.aio import MessageBus +from dbus_fast.service import ServiceInterface, method + +log = logging.getLogger(__name__) + +# D-Bus constants +BLUEZ_SERVICE = "org.bluez" +PROFILE_MANAGER_IFACE = "org.bluez.ProfileManager1" +HFP_AG_UUID = "0000111f-0000-1000-8000-00805f9b34fb" +HFP_AG_PROFILE_PATH = "/mcbluetooth/hfp_ag" + +# HFP 1.7 AG feature flags +HFP_AG_FEATURES = ( + (1 << 0) # Three-way calling + | (1 << 1) # EC/NR function + | (1 << 2) # Voice recognition + | (1 << 3) # In-band ring tone + | (1 << 4) # Voice tag + | (1 << 5) # Reject call + | (1 << 6) # Enhanced call status + | (1 << 7) # Enhanced call control + | (1 << 8) # Extended error result codes + | (1 << 9) # Codec negotiation +) + + +class CallState(Enum): + IDLE = "idle" + INCOMING = "incoming" # ringing + OUTGOING = "outgoing" # dialing + ALERTING = "alerting" # remote ringing + ACTIVE = "active" # in call + HELD = "held" + + +class CallDirection(Enum): + OUTGOING = 0 + INCOMING = 1 + + +@dataclass +class CallInfo: + index: int + direction: CallDirection + state: CallState + number: str = "" + number_type: int = 129 # 129=unknown, 145=international + + +@dataclass +class HFPConnection: + """State for a single HFP connection to an HF device.""" + device_path: str + address: str + fd: int + sock: socket.socket | None = None + reader: asyncio.StreamReader | None = None + writer: asyncio.StreamWriter | None = None + slc_established: bool = False + codec: str = "cvsd" # cvsd or msbc + hf_features: int = 0 + speaker_volume: int = 7 + mic_volume: int = 7 + calls: list[CallInfo] = field(default_factory=list) + _read_task: asyncio.Task | None = field(default=None, repr=False) + _ring_task: asyncio.Task | None = field(default=None, repr=False) + + +# HFP indicator definitions (order matters — index referenced by +CIEV) +# (name, range, initial_value) +INDICATORS = [ + ("service", (0, 1), 1), # 1: network available + ("call", (0, 1), 0), # 2: active call + ("callsetup", (0, 3), 0), # 3: 0=none, 1=incoming, 2=outgoing, 3=alerting + ("callheld", (0, 2), 0), # 4: 0=none, 1=held+active, 2=held only + ("signal", (0, 5), 5), # 5: signal strength + ("roam", (0, 1), 0), # 6: roaming + ("battchg", (0, 5), 5), # 7: battery level +] + + +def _path_to_address(device_path: str) -> str: + parts = device_path.split("/") + if len(parts) >= 5 and parts[-1].startswith("dev_"): + return parts[-1][4:].replace("_", ":") + return device_path + + +class HFPAudioGatewayProfile(ServiceInterface): + """D-Bus Profile1 service for HFP Audio Gateway role.""" + + def __init__(self) -> None: + super().__init__("org.bluez.Profile1") + self.connections: dict[str, HFPConnection] = {} # keyed by device_path + self.indicator_values = [ind[2] for ind in INDICATORS] + self._event_callbacks: list[Any] = [] + + def on_event(self, callback): + """Register a callback for HFP events: callback(event_type, data).""" + self._event_callbacks.append(callback) + + def _emit(self, event_type: str, data: dict[str, Any]) -> None: + for cb in self._event_callbacks: + try: + cb(event_type, data) + except Exception: + pass + + @method() + def Release(self) -> None: + log.info("HFP AG profile released") + for conn in list(self.connections.values()): + self._cleanup_connection(conn) + self.connections.clear() + + @method() + def NewConnection(self, device: "o", fd: "h", properties: "a{sv}") -> None: + address = _path_to_address(device) + log.info("HFP AG: new connection from %s (fd=%d)", address, fd) + + # Duplicate the fd so we own it + new_fd = os.dup(fd) + conn = HFPConnection( + device_path=device, + address=address, + fd=new_fd, + ) + self.connections[device] = conn + + # Start async AT command handler + loop = asyncio.get_event_loop() + conn._read_task = loop.create_task(self._handle_connection(conn)) + + self._emit("hfp_ag_connect", {"address": address}) + + @method() + def RequestDisconnection(self, device: "o") -> None: + address = _path_to_address(device) + log.info("HFP AG: disconnect requested for %s", address) + conn = self.connections.pop(device, None) + if conn: + self._cleanup_connection(conn) + self._emit("hfp_ag_disconnect", {"address": address}) + + def _cleanup_connection(self, conn: HFPConnection) -> None: + if conn._ring_task and not conn._ring_task.done(): + conn._ring_task.cancel() + if conn._read_task and not conn._read_task.done(): + conn._read_task.cancel() + if conn.writer: + try: + conn.writer.close() + except Exception: + pass + if conn.sock: + try: + conn.sock.close() + except Exception: + pass + else: + try: + os.close(conn.fd) + except Exception: + pass + + # ==================== AT Command Protocol ==================== + + async def _handle_connection(self, conn: HFPConnection) -> None: + """Read loop for AT commands from the HF device.""" + try: + conn.sock = socket.fromfd(conn.fd, socket.AF_BLUETOOTH, socket.SOCK_STREAM) + conn.sock.setblocking(False) + + loop = asyncio.get_event_loop() + conn.reader, conn.writer = await asyncio.open_connection(sock=conn.sock) + + buf = b"" + while True: + data = await conn.reader.read(1024) + if not data: + break + buf += data + + while b"\r" in buf: + line, buf = buf.split(b"\r", 1) + # Strip leading \n if present + line = line.lstrip(b"\n").strip() + if line: + await self._process_at_command(conn, line.decode("utf-8", errors="replace")) + + except (ConnectionResetError, BrokenPipeError, OSError): + log.debug("HFP AG: connection closed for %s", conn.address) + except asyncio.CancelledError: + pass + except Exception: + log.exception("HFP AG: error handling connection for %s", conn.address) + finally: + self.connections.pop(conn.device_path, None) + self._cleanup_connection(conn) + self._emit("hfp_ag_disconnect", {"address": conn.address}) + + async def _send(self, conn: HFPConnection, response: str) -> None: + """Send an AT response to the HF device.""" + if conn.writer and not conn.writer.is_closing(): + conn.writer.write(f"\r\n{response}\r\n".encode()) + await conn.writer.drain() + log.debug("HFP AG → %s: %s", conn.address, response) + + async def _send_ok(self, conn: HFPConnection) -> None: + await self._send(conn, "OK") + + async def _send_error(self, conn: HFPConnection) -> None: + await self._send(conn, "ERROR") + + async def _process_at_command(self, conn: HFPConnection, line: str) -> None: + """Parse and handle an AT command from the HF.""" + log.debug("HFP AG ← %s: %s", conn.address, line) + cmd = line.strip().upper() + + self._emit("hfp_ag_at_recv", {"address": conn.address, "command": line.strip()}) + + # SLC establishment commands + if cmd.startswith("AT+BRSF="): + await self._handle_brsf(conn, line) + elif cmd == "AT+CIND=?": + await self._handle_cind_test(conn) + elif cmd == "AT+CIND?": + await self._handle_cind_read(conn) + elif cmd.startswith("AT+CMER="): + await self._handle_cmer(conn, line) + elif cmd.startswith("AT+CHLD=?"): + await self._handle_chld_test(conn) + + # Codec negotiation + elif cmd.startswith("AT+BAC="): + await self._handle_bac(conn, line) + elif cmd.startswith("AT+BCS="): + await self._handle_bcs(conn, line) + + # Call control + elif cmd == "ATA": + await self._handle_answer(conn) + elif cmd == "AT+CHUP": + await self._handle_hangup(conn) + elif cmd.startswith("ATD"): + await self._handle_dial(conn, line) + elif cmd.startswith("AT+DTMF="): + await self._handle_dtmf(conn, line) + + # Volume + elif cmd.startswith("AT+VGS="): + await self._handle_vgs(conn, line) + elif cmd.startswith("AT+VGM="): + await self._handle_vgm(conn, line) + + # Status queries + elif cmd.startswith("AT+CLCC"): + await self._handle_clcc(conn) + elif cmd.startswith("AT+COPS"): + await self._handle_cops(conn, line) + elif cmd.startswith("AT+CNUM"): + await self._handle_cnum(conn) + + # Voice recognition + elif cmd.startswith("AT+BVRA="): + await self._handle_bvra(conn, line) + + # Misc + elif cmd == "AT+CMEE=1": + await self._send_ok(conn) + elif cmd.startswith("AT+NREC="): + await self._send_ok(conn) # Noise reduction - acknowledge + elif cmd.startswith("AT+BTRH"): + await self._send_ok(conn) + elif cmd.startswith("AT+BIND"): + await self._handle_bind(conn, line) + elif cmd.startswith("AT+XAPL="): + await self._handle_xapl(conn, line) + elif cmd == "AT": + await self._send_ok(conn) + else: + log.warning("HFP AG: unknown command from %s: %s", conn.address, line) + await self._send_error(conn) + + # ---- SLC negotiation handlers ---- + + async def _handle_brsf(self, conn: HFPConnection, line: str) -> None: + """AT+BRSF= — exchange supported features.""" + try: + conn.hf_features = int(line.split("=")[1].strip()) + except (IndexError, ValueError): + conn.hf_features = 0 + await self._send(conn, f"+BRSF: {HFP_AG_FEATURES}") + await self._send_ok(conn) + + async def _handle_cind_test(self, conn: HFPConnection) -> None: + """AT+CIND=? — report indicator mapping.""" + parts = [] + for name, (lo, hi), _ in INDICATORS: + parts.append(f'("{name}",({lo},{hi}))') + await self._send(conn, "+CIND: " + ",".join(parts)) + await self._send_ok(conn) + + async def _handle_cind_read(self, conn: HFPConnection) -> None: + """AT+CIND? — report current indicator values.""" + vals = ",".join(str(v) for v in self.indicator_values) + await self._send(conn, f"+CIND: {vals}") + await self._send_ok(conn) + + async def _handle_cmer(self, conn: HFPConnection, line: str) -> None: + """AT+CMER=3,0,0,1 — enable indicator status reporting. SLC done.""" + conn.slc_established = True + await self._send_ok(conn) + log.info("HFP AG: SLC established with %s", conn.address) + self._emit("hfp_ag_slc_established", { + "address": conn.address, + "hf_features": conn.hf_features, + }) + + async def _handle_chld_test(self, conn: HFPConnection) -> None: + """AT+CHLD=? — report call hold/multiparty capabilities.""" + await self._send(conn, "+CHLD: (0,1,2,3)") + await self._send_ok(conn) + + async def _handle_bac(self, conn: HFPConnection, line: str) -> None: + """AT+BAC= — available codecs from HF.""" + # 1=CVSD, 2=mSBC + await self._send_ok(conn) + # If WBS supported, select mSBC (codec 2) + if "2" in line: + await self._send(conn, "+BCS: 2") + + async def _handle_bcs(self, conn: HFPConnection, line: str) -> None: + """AT+BCS= — codec confirmation from HF.""" + try: + codec_id = int(line.split("=")[1].strip()) + conn.codec = "msbc" if codec_id == 2 else "cvsd" + except (IndexError, ValueError): + conn.codec = "cvsd" + await self._send_ok(conn) + log.info("HFP AG: codec selected: %s with %s", conn.codec, conn.address) + + # ---- Call control handlers ---- + + async def _handle_answer(self, conn: HFPConnection) -> None: + """ATA — HF answers incoming call.""" + incoming = [c for c in conn.calls if c.state == CallState.INCOMING] + if incoming: + call = incoming[0] + call.state = CallState.ACTIVE + await self._send_ok(conn) + # Update indicators: call=1, callsetup=0 + await self._update_indicator(conn, 2, 1) # call active + await self._update_indicator(conn, 3, 0) # callsetup none + self._emit("hfp_ag_call_answered", { + "address": conn.address, + "number": call.number, + }) + # Stop ring task + if conn._ring_task and not conn._ring_task.done(): + conn._ring_task.cancel() + conn._ring_task = None + else: + await self._send_error(conn) + + async def _handle_hangup(self, conn: HFPConnection) -> None: + """AT+CHUP — HF hangs up / rejects call.""" + active = [c for c in conn.calls if c.state in (CallState.ACTIVE, CallState.INCOMING, CallState.OUTGOING, CallState.ALERTING)] + if active: + call = active[0] + old_state = call.state + conn.calls.remove(call) + await self._send_ok(conn) + # Update indicators + if old_state == CallState.ACTIVE: + await self._update_indicator(conn, 2, 0) # call inactive + elif old_state == CallState.INCOMING: + await self._update_indicator(conn, 3, 0) # callsetup none + self._emit("hfp_ag_call_ended", { + "address": conn.address, + "number": call.number, + "reason": "hangup", + }) + # Stop ring task + if conn._ring_task and not conn._ring_task.done(): + conn._ring_task.cancel() + conn._ring_task = None + else: + await self._send_ok(conn) + + async def _handle_dial(self, conn: HFPConnection, line: str) -> None: + """ATD; — HF initiates outgoing call.""" + number = line[3:].rstrip(";").strip() + call = CallInfo( + index=len(conn.calls) + 1, + direction=CallDirection.OUTGOING, + state=CallState.OUTGOING, + number=number, + ) + conn.calls.append(call) + await self._send_ok(conn) + await self._update_indicator(conn, 3, 2) # callsetup=outgoing + self._emit("hfp_ag_outgoing_call", { + "address": conn.address, + "number": number, + }) + + async def _handle_dtmf(self, conn: HFPConnection, line: str) -> None: + """AT+DTMF= — HF sends DTMF tone.""" + try: + code = line.split("=")[1].strip() + except IndexError: + code = "" + await self._send_ok(conn) + self._emit("hfp_ag_dtmf", {"address": conn.address, "code": code}) + + # ---- Volume handlers ---- + + async def _handle_vgs(self, conn: HFPConnection, line: str) -> None: + """AT+VGS= — HF reports speaker volume.""" + try: + conn.speaker_volume = int(line.split("=")[1].strip()) + except (IndexError, ValueError): + pass + await self._send_ok(conn) + self._emit("hfp_ag_volume", { + "address": conn.address, + "type": "speaker", + "level": conn.speaker_volume, + }) + + async def _handle_vgm(self, conn: HFPConnection, line: str) -> None: + """AT+VGM= — HF reports microphone volume.""" + try: + conn.mic_volume = int(line.split("=")[1].strip()) + except (IndexError, ValueError): + pass + await self._send_ok(conn) + self._emit("hfp_ag_volume", { + "address": conn.address, + "type": "microphone", + "level": conn.mic_volume, + }) + + # ---- Status queries ---- + + async def _handle_clcc(self, conn: HFPConnection) -> None: + """AT+CLCC — list current calls.""" + for call in conn.calls: + state_map = { + CallState.ACTIVE: 0, + CallState.HELD: 1, + CallState.OUTGOING: 2, + CallState.ALERTING: 3, + CallState.INCOMING: 4, + } + stat = state_map.get(call.state, 0) + await self._send( + conn, + f"+CLCC: {call.index},{call.direction.value},{stat},0,0" + + (f',"{call.number}",{call.number_type}' if call.number else ""), + ) + await self._send_ok(conn) + + async def _handle_cops(self, conn: HFPConnection, line: str) -> None: + """AT+COPS — operator name query.""" + if "?" in line: + await self._send(conn, '+COPS: 0,0,"mcbluetooth"') + await self._send_ok(conn) + + async def _handle_cnum(self, conn: HFPConnection) -> None: + """AT+CNUM — subscriber number.""" + await self._send(conn, '+CNUM: ,"5551234567",129,,4') + await self._send_ok(conn) + + async def _handle_bvra(self, conn: HFPConnection, line: str) -> None: + """AT+BVRA= — voice recognition.""" + try: + state = int(line.split("=")[1].strip()) + except (IndexError, ValueError): + state = 0 + await self._send_ok(conn) + self._emit("hfp_ag_voice_recognition", { + "address": conn.address, + "active": bool(state), + }) + + async def _handle_bind(self, conn: HFPConnection, line: str) -> None: + """AT+BIND — HF indicators.""" + if "=?" in line: + await self._send(conn, "+BIND: (1,2)") # enhanced safety, battery + await self._send_ok(conn) + elif "?" in line: + await self._send(conn, "+BIND: 1,1") + await self._send(conn, "+BIND: 2,1") + await self._send_ok(conn) + elif "=" in line: + await self._send_ok(conn) + else: + await self._send_ok(conn) + + async def _handle_xapl(self, conn: HFPConnection, line: str) -> None: + """AT+XAPL= — Apple-specific extension. Acknowledge.""" + await self._send(conn, "+XAPL=iPhone,7") + await self._send_ok(conn) + + # ==================== AG-initiated actions ==================== + + async def _update_indicator(self, conn: HFPConnection, index: int, value: int) -> None: + """Send +CIEV indicator update to HF (1-based index).""" + if 1 <= index <= len(self.indicator_values): + self.indicator_values[index - 1] = value + if conn.slc_established: + await self._send(conn, f"+CIEV: {index},{value}") + + async def simulate_incoming_call( + self, address: str, number: str = "5551234567", number_type: int = 129 + ) -> bool: + """Simulate an incoming call from the AG to the HF device. + + Returns True if the call was initiated successfully. + """ + conn = self._get_connection(address) + if not conn or not conn.slc_established: + return False + + call = CallInfo( + index=len(conn.calls) + 1, + direction=CallDirection.INCOMING, + state=CallState.INCOMING, + number=number, + number_type=number_type, + ) + conn.calls.append(call) + + # Update callsetup indicator: 1 = incoming + await self._update_indicator(conn, 3, 1) + + # Start ringing + async def ring_loop(): + try: + while call.state == CallState.INCOMING: + await self._send(conn, "RING") + await self._send(conn, f'+CLIP: "{number}",{number_type}') + await asyncio.sleep(3.0) + except asyncio.CancelledError: + pass + except Exception: + pass + + conn._ring_task = asyncio.get_event_loop().create_task(ring_loop()) + self._emit("hfp_ag_incoming_call", { + "address": address, + "number": number, + }) + return True + + async def simulate_call_end(self, address: str) -> bool: + """End any active or ringing call from AG side.""" + conn = self._get_connection(address) + if not conn: + return False + + active = [c for c in conn.calls if c.state in (CallState.ACTIVE, CallState.INCOMING)] + if not active: + return False + + call = active[0] + was_active = call.state == CallState.ACTIVE + conn.calls.remove(call) + + if conn._ring_task and not conn._ring_task.done(): + conn._ring_task.cancel() + conn._ring_task = None + + if was_active: + await self._update_indicator(conn, 2, 0) # call inactive + await self._update_indicator(conn, 3, 0) # callsetup none + + self._emit("hfp_ag_call_ended", { + "address": address, + "number": call.number, + "reason": "ag_hangup", + }) + return True + + async def set_signal_strength(self, address: str, level: int) -> bool: + """Update signal strength indicator (0-5).""" + conn = self._get_connection(address) + if not conn: + return False + level = max(0, min(5, level)) + await self._update_indicator(conn, 5, level) + return True + + async def set_battery_level(self, address: str, level: int) -> bool: + """Update battery level indicator (0-5).""" + conn = self._get_connection(address) + if not conn: + return False + level = max(0, min(5, level)) + await self._update_indicator(conn, 7, level) + return True + + async def set_speaker_volume(self, address: str, level: int) -> bool: + """Send volume change to HF device (0-15).""" + conn = self._get_connection(address) + if not conn or not conn.slc_established: + return False + level = max(0, min(15, level)) + conn.speaker_volume = level + await self._send(conn, f"+VGS: {level}") + return True + + async def set_mic_volume(self, address: str, level: int) -> bool: + """Send mic volume change to HF device (0-15).""" + conn = self._get_connection(address) + if not conn or not conn.slc_established: + return False + level = max(0, min(15, level)) + conn.mic_volume = level + await self._send(conn, f"+VGM: {level}") + return True + + def get_connection(self, address: str) -> HFPConnection | None: + """Get connection by address (public API).""" + return self._get_connection(address) + + def _get_connection(self, address: str) -> HFPConnection | None: + for conn in self.connections.values(): + if conn.address.upper() == address.upper(): + return conn + return None + + def get_status(self) -> dict[str, Any]: + """Get overall HFP AG status.""" + conns = [] + for conn in self.connections.values(): + calls = [] + for call in conn.calls: + calls.append({ + "index": call.index, + "direction": call.direction.name.lower(), + "state": call.state.value, + "number": call.number, + }) + conns.append({ + "address": conn.address, + "slc_established": conn.slc_established, + "codec": conn.codec, + "speaker_volume": conn.speaker_volume, + "mic_volume": conn.mic_volume, + "calls": calls, + }) + return { + "registered": _profile_registered, + "connections": conns, + "indicators": { + ind[0]: self.indicator_values[i] + for i, ind in enumerate(INDICATORS) + }, + } + + +# ==================== Module-level lifecycle ==================== + +_profile: HFPAudioGatewayProfile | None = None +_profile_bus: MessageBus | None = None +_profile_registered: bool = False + + +async def enable_hfp_ag() -> HFPAudioGatewayProfile: + """Register the HFP AG profile with BlueZ.""" + global _profile, _profile_bus, _profile_registered + + if _profile_registered and _profile: + return _profile + + if _profile is None: + _profile = HFPAudioGatewayProfile() + + if _profile_bus is None: + _profile_bus = await MessageBus(bus_type=BusType.SYSTEM).connect() + _profile_bus.export(HFP_AG_PROFILE_PATH, _profile) + + # Register with ProfileManager1 + introspection = await _profile_bus.introspect(BLUEZ_SERVICE, "/org/bluez") + proxy = _profile_bus.get_proxy_object(BLUEZ_SERVICE, "/org/bluez", introspection) + profile_mgr = proxy.get_interface(PROFILE_MANAGER_IFACE) + + options = { + "Name": Variant("s", "mcbluetooth HFP AG"), + "Role": Variant("s", "server"), + "Channel": Variant("q", 13), # RFCOMM channel for HFP + "Features": Variant("q", HFP_AG_FEATURES & 0xFFFF), + "Version": Variant("q", 0x0107), # HFP 1.7 + } + + try: + await profile_mgr.call_register_profile( + HFP_AG_PROFILE_PATH, + HFP_AG_UUID, + options, + ) + _profile_registered = True + log.info("HFP AG profile registered with BlueZ") + except Exception as e: + if "Already Exists" in str(e): + _profile_registered = True + log.info("HFP AG profile was already registered") + else: + raise + + return _profile + + +async def disable_hfp_ag() -> None: + """Unregister the HFP AG profile.""" + global _profile, _profile_bus, _profile_registered + + if _profile_bus and _profile_registered: + try: + introspection = await _profile_bus.introspect(BLUEZ_SERVICE, "/org/bluez") + proxy = _profile_bus.get_proxy_object(BLUEZ_SERVICE, "/org/bluez", introspection) + profile_mgr = proxy.get_interface(PROFILE_MANAGER_IFACE) + await profile_mgr.call_unregister_profile(HFP_AG_PROFILE_PATH) + except Exception: + pass + _profile_registered = False + + if _profile: + _profile.Release() + _profile = None + + if _profile_bus: + _profile_bus.disconnect() + _profile_bus = None + + +async def get_hfp_ag() -> HFPAudioGatewayProfile | None: + """Get the current HFP AG profile instance (None if not enabled).""" + return _profile diff --git a/src/mcbluetooth/server.py b/src/mcbluetooth/server.py index bdd44dc..9add767 100644 --- a/src/mcbluetooth/server.py +++ b/src/mcbluetooth/server.py @@ -3,7 +3,7 @@ from fastmcp import FastMCP from mcbluetooth import resources -from mcbluetooth.tools import adapter, audio, ble, device, monitor, obex +from mcbluetooth.tools import adapter, audio, ble, device, hfp, monitor, obex mcp = FastMCP( name="mcbluetooth", @@ -43,6 +43,7 @@ resources.register_resources(mcp) adapter.register_tools(mcp) device.register_tools(mcp) audio.register_tools(mcp) +hfp.register_tools(mcp) ble.register_tools(mcp) monitor.register_tools(mcp) obex.register_tools(mcp) diff --git a/src/mcbluetooth/tools/hfp.py b/src/mcbluetooth/tools/hfp.py new file mode 100644 index 0000000..aac7d84 --- /dev/null +++ b/src/mcbluetooth/tools/hfp.py @@ -0,0 +1,212 @@ +"""HFP Audio Gateway tools for Bluetooth MCP server. + +These tools let Linux act as a phone (Audio Gateway) for testing Bluetooth +headsets and hands-free devices. The ESP32 test harness connects as the +Hands-Free Unit (headset), and these tools simulate call control from the +phone side. + +Typical E2E test flow: + 1. bt_hfp_ag_enable() → Register AG profile with BlueZ + 2. ESP32 connects as HF → SLC auto-negotiated + 3. bt_hfp_ag_simulate_call() → Send RING + CLIP to ESP32 + 4. ESP32 answers (ATA) → Call becomes active + 5. bt_hfp_ag_end_call() → Terminate call +""" + +from __future__ import annotations + +from typing import Any, Literal + +from fastmcp import FastMCP + +from mcbluetooth.hfp_ag import ( + disable_hfp_ag, + enable_hfp_ag, + get_hfp_ag, +) + + +def register_tools(mcp: FastMCP) -> None: + """Register HFP AG tools with the MCP server.""" + + @mcp.tool() + async def bt_hfp_ag_enable() -> dict[str, Any]: + """Enable HFP Audio Gateway mode on Linux. + + Registers a custom HFP AG profile with BlueZ via ProfileManager1. + After enabling, Bluetooth headsets (HF devices) can connect and + Linux will act as the phone side. + + The SLC (Service Level Connection) is auto-negotiated when an HF + device connects — feature exchange, indicator setup, and codec + selection happen automatically. + + Returns: + Status confirming AG profile registration. + """ + try: + await enable_hfp_ag() + return {"status": "ok", "role": "audio_gateway", "profile": "HFP AG 1.7"} + except Exception as exc: + return {"status": "error", "error": str(exc)} + + @mcp.tool() + async def bt_hfp_ag_disable() -> dict[str, Any]: + """Disable HFP Audio Gateway mode. + + Unregisters the AG profile, disconnecting any active HFP sessions. + + Returns: + Status confirming AG profile removal. + """ + try: + await disable_hfp_ag() + return {"status": "ok", "disabled": True} + except Exception as exc: + return {"status": "error", "error": str(exc)} + + @mcp.tool() + async def bt_hfp_ag_status() -> dict[str, Any]: + """Get HFP Audio Gateway status. + + Returns: + - registered: Whether the AG profile is active + - connections: List of connected HF devices with SLC state, + codec, volumes, and active calls + - indicators: Current indicator values (service, call, signal, etc.) + """ + profile = await get_hfp_ag() + if not profile: + return {"status": "ok", "registered": False, "connections": []} + return {"status": "ok", **profile.get_status()} + + @mcp.tool() + async def bt_hfp_ag_simulate_call( + address: str, + number: str = "5551234567", + ) -> dict[str, Any]: + """Simulate an incoming call to a connected HF device. + + Sends RING and +CLIP (caller ID) notifications to the headset. + The HF device will see an incoming call and can answer (ATA) or + reject (AT+CHUP). + + Ringing repeats every 3 seconds until answered or ended. + + Args: + address: Bluetooth address of the connected HF device. + number: Caller phone number to display. + + Returns: + Status of call initiation. + """ + profile = await get_hfp_ag() + if not profile: + return {"status": "error", "error": "HFP AG not enabled"} + + ok = await profile.simulate_incoming_call(address, number=number) + if ok: + return {"status": "ok", "call_state": "ringing", "number": number} + return { + "status": "error", + "error": "No SLC connection to device (connect HF first)", + } + + @mcp.tool() + async def bt_hfp_ag_end_call(address: str) -> dict[str, Any]: + """End an active or ringing call from the AG side. + + Terminates the current call and sends indicator updates to the + HF device. + + Args: + address: Bluetooth address of the connected HF device. + + Returns: + Status of call termination. + """ + profile = await get_hfp_ag() + if not profile: + return {"status": "error", "error": "HFP AG not enabled"} + + ok = await profile.simulate_call_end(address) + if ok: + return {"status": "ok", "call_state": "ended"} + return {"status": "error", "error": "No active call to end"} + + @mcp.tool() + async def bt_hfp_ag_set_volume( + address: str, + type: Literal["speaker", "microphone"], + level: int, + ) -> dict[str, Any]: + """Set speaker or microphone volume on the HF device. + + Sends +VGS (speaker) or +VGM (mic) to change volume remotely. + + Args: + address: Bluetooth address of the connected HF device. + type: "speaker" for output volume, "microphone" for input. + level: Volume level 0-15 (0 = muted, 15 = maximum). + + Returns: + Status with applied volume level. + """ + profile = await get_hfp_ag() + if not profile: + return {"status": "error", "error": "HFP AG not enabled"} + + if type == "speaker": + ok = await profile.set_speaker_volume(address, level) + else: + ok = await profile.set_mic_volume(address, level) + + if ok: + return {"status": "ok", "type": type, "level": level} + return {"status": "error", "error": "No SLC connection to device"} + + @mcp.tool() + async def bt_hfp_ag_set_signal( + address: str, + level: int, + ) -> dict[str, Any]: + """Update the signal strength indicator shown on the HF device. + + Args: + address: Bluetooth address of the connected HF device. + level: Signal strength 0-5. + + Returns: + Status with applied signal level. + """ + profile = await get_hfp_ag() + if not profile: + return {"status": "error", "error": "HFP AG not enabled"} + + ok = await profile.set_signal_strength(address, level) + if ok: + return {"status": "ok", "signal_strength": level} + return {"status": "error", "error": "No SLC connection to device"} + + @mcp.tool() + async def bt_hfp_ag_set_battery( + address: str, + level: int, + ) -> dict[str, Any]: + """Update the battery level indicator shown on the HF device. + + Args: + address: Bluetooth address of the connected HF device. + level: Battery level 0-5. + + Returns: + Status with applied battery level. + """ + profile = await get_hfp_ag() + if not profile: + return {"status": "error", "error": "HFP AG not enabled"} + + ok = await profile.set_battery_level(address, level) + if ok: + return {"status": "ok", "battery_level": level} + return {"status": "error", "error": "No SLC connection to device"}