Fix HFP AG E2E: post-SLC handlers, fd management, DTMF routing

Multiple bugs preventing stable HFP connections with real hardware
(ESP32 Bluedroid HFU):

- Add AF_BLUETOOTH/BTPROTO_RFCOMM fallback constants for Python builds
  compiled without bluetooth.h
- Fix NewConnection fd handling: validate, os.dup, transfer to socket
  via socket.fromfd with proper protocol param, close intermediate fd
- Remove premature +BCS codec selection from AT+BAC handler — sending
  +BCS during SLC setup confuses Bluedroid's state machine
- Add post-SLC command handlers: AT+BIA (indicator activation),
  AT+CCWA (call waiting), AT+CLIP (caller line ID) — without these
  the HF drops the RFCOMM connection after ~22 seconds
- Route AT+VTS= to DTMF handler (standard HFP command, alongside
  the non-standard AT+DTMF=)
- Fix simulate_call_end to handle OUTGOING/ALERTING call states
- Respect AT+BIA flags in _update_indicator
- Only send +CLIP during RING when HF has enabled it
- Clean up debug logging: remove file-based logger, use log.debug
- Add ruff per-file-ignores for dbus-fast D-Bus type annotations

Validated: 85/86 E2E tests PASS with ESP32 HFP Hands-Free Unit
This commit is contained in:
Ryan Malloy 2026-02-03 18:38:34 -07:00
parent 2597c8b8b4
commit 0799067a1a
2 changed files with 153 additions and 24 deletions

View File

@ -56,6 +56,10 @@ target-version = "py311"
select = ["E", "F", "W", "I", "B", "UP"] select = ["E", "F", "W", "I", "B", "UP"]
ignore = ["E501"] ignore = ["E501"]
[tool.ruff.lint.per-file-ignores]
# dbus-fast uses D-Bus type signatures ("o", "h", "a{sv}") as annotations
"src/mcbluetooth/hfp_ag.py" = ["F821", "F722"]
[tool.pytest.ini_options] [tool.pytest.ini_options]
asyncio_mode = "auto" asyncio_mode = "auto"
testpaths = ["tests"] testpaths = ["tests"]

View File

@ -19,7 +19,7 @@ import logging
import os import os
import socket import socket
from dataclasses import dataclass, field from dataclasses import dataclass, field
from enum import Enum, IntFlag from enum import Enum
from typing import Any from typing import Any
from dbus_fast import BusType, Variant from dbus_fast import BusType, Variant
@ -28,6 +28,11 @@ from dbus_fast.service import ServiceInterface, method
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
# Bluetooth socket constants — use Python's if available, else Linux values.
# Python is often compiled without bluetooth.h, so these may not exist.
_AF_BLUETOOTH = getattr(socket, "AF_BLUETOOTH", 31)
_BTPROTO_RFCOMM = getattr(socket, "BTPROTO_RFCOMM", 3)
# D-Bus constants # D-Bus constants
BLUEZ_SERVICE = "org.bluez" BLUEZ_SERVICE = "org.bluez"
PROFILE_MANAGER_IFACE = "org.bluez.ProfileManager1" PROFILE_MANAGER_IFACE = "org.bluez.ProfileManager1"
@ -87,6 +92,10 @@ class HFPConnection:
speaker_volume: int = 7 speaker_volume: int = 7
mic_volume: int = 7 mic_volume: int = 7
calls: list[CallInfo] = field(default_factory=list) calls: list[CallInfo] = field(default_factory=list)
_hf_supports_msbc: bool = False
_clip_enabled: bool = False # Caller line ID presentation
_ccwa_enabled: bool = False # Call waiting notification
_indicator_active: list[bool] = field(default_factory=list) # per-indicator activation (AT+BIA)
_read_task: asyncio.Task | None = field(default=None, repr=False) _read_task: asyncio.Task | None = field(default=None, repr=False)
_ring_task: asyncio.Task | None = field(default=None, repr=False) _ring_task: asyncio.Task | None = field(default=None, repr=False)
@ -141,10 +150,31 @@ class HFPAudioGatewayProfile(ServiceInterface):
@method() @method()
def NewConnection(self, device: "o", fd: "h", properties: "a{sv}") -> None: def NewConnection(self, device: "o", fd: "h", properties: "a{sv}") -> None:
address = _path_to_address(device) address = _path_to_address(device)
log.info("HFP AG: new connection from %s (fd=%d)", address, fd) log.debug("NewConnection called: device=%s fd=%r type=%s props=%s",
device, fd, type(fd).__name__, properties)
# Duplicate the fd so we own it if fd is None or (isinstance(fd, int) and fd < 0):
log.error("INVALID fd received: %r — D-Bus FD negotiation may have failed", fd)
return
# Validate the fd is actually usable
try:
stat = os.fstat(fd)
log.debug("fd %d fstat: mode=0o%o", fd, stat.st_mode)
except OSError as e:
log.error("fd %d fstat failed: %s", fd, e)
log.info("HFP AG: NewConnection from %s (fd=%d)", address, fd)
# Duplicate the fd so we own it independent of dbus-fast
try:
new_fd = os.dup(fd) new_fd = os.dup(fd)
log.debug("os.dup(%d) → %d", fd, new_fd)
except OSError:
log.exception("os.dup(%d) FAILED", fd)
log.exception("HFP AG: os.dup(%d) failed for %s", fd, address)
return
conn = HFPConnection( conn = HFPConnection(
device_path=device, device_path=device,
address=address, address=address,
@ -157,6 +187,7 @@ class HFPAudioGatewayProfile(ServiceInterface):
conn._read_task = loop.create_task(self._handle_connection(conn)) conn._read_task = loop.create_task(self._handle_connection(conn))
self._emit("hfp_ag_connect", {"address": address}) self._emit("hfp_ag_connect", {"address": address})
log.debug("NewConnection done, task created for %s", address)
@method() @method()
def RequestDisconnection(self, device: "o") -> None: def RequestDisconnection(self, device: "o") -> None:
@ -182,7 +213,7 @@ class HFPAudioGatewayProfile(ServiceInterface):
conn.sock.close() conn.sock.close()
except Exception: except Exception:
pass pass
else: elif conn.fd >= 0:
try: try:
os.close(conn.fd) os.close(conn.fd)
except Exception: except Exception:
@ -193,17 +224,33 @@ class HFPAudioGatewayProfile(ServiceInterface):
async def _handle_connection(self, conn: HFPConnection) -> None: async def _handle_connection(self, conn: HFPConnection) -> None:
"""Read loop for AT commands from the HF device.""" """Read loop for AT commands from the HF device."""
try: try:
conn.sock = socket.fromfd(conn.fd, socket.AF_BLUETOOTH, socket.SOCK_STREAM) log.debug("_handle_connection start: addr=%s fd=%d", conn.address, conn.fd)
# socket.fromfd() dups the fd internally — close our intermediate copy
conn.sock = socket.fromfd(
conn.fd, _AF_BLUETOOTH, socket.SOCK_STREAM,
_BTPROTO_RFCOMM,
)
log.debug("socket.fromfd OK: fileno=%d family=%s type=%s",
conn.sock.fileno(), conn.sock.family, conn.sock.type)
try:
os.close(conn.fd)
except OSError:
pass
conn.fd = -1 # transferred to socket
conn.sock.setblocking(False) conn.sock.setblocking(False)
loop = asyncio.get_event_loop()
conn.reader, conn.writer = await asyncio.open_connection(sock=conn.sock) conn.reader, conn.writer = await asyncio.open_connection(sock=conn.sock)
log.debug("asyncio streams ready for %s, entering read loop", conn.address)
buf = b"" buf = b""
while True: while True:
data = await conn.reader.read(1024) data = await conn.reader.read(1024)
if not data: if not data:
log.debug("EOF from %s (clean disconnect)", conn.address)
break break
log.debug("recv %d bytes from %s: %r", len(data), conn.address, data[:80])
buf += data buf += data
while b"\r" in buf: while b"\r" in buf:
@ -213,13 +260,14 @@ class HFPAudioGatewayProfile(ServiceInterface):
if line: if line:
await self._process_at_command(conn, line.decode("utf-8", errors="replace")) await self._process_at_command(conn, line.decode("utf-8", errors="replace"))
except (ConnectionResetError, BrokenPipeError, OSError): except (ConnectionResetError, BrokenPipeError, OSError) as e:
log.debug("HFP AG: connection closed for %s", conn.address) log.debug("connection error for %s: %s: %s", conn.address, type(e).__name__, e)
except asyncio.CancelledError: except asyncio.CancelledError:
pass log.debug("task cancelled for %s", conn.address)
except Exception: except Exception:
log.exception("HFP AG: error handling connection for %s", conn.address) log.exception("UNEXPECTED error for %s", conn.address)
finally: finally:
log.debug("cleanup for %s", conn.address)
self.connections.pop(conn.device_path, None) self.connections.pop(conn.device_path, None)
self._cleanup_connection(conn) self._cleanup_connection(conn)
self._emit("hfp_ag_disconnect", {"address": conn.address}) self._emit("hfp_ag_disconnect", {"address": conn.address})
@ -227,7 +275,9 @@ class HFPAudioGatewayProfile(ServiceInterface):
async def _send(self, conn: HFPConnection, response: str) -> None: async def _send(self, conn: HFPConnection, response: str) -> None:
"""Send an AT response to the HF device.""" """Send an AT response to the HF device."""
if conn.writer and not conn.writer.is_closing(): if conn.writer and not conn.writer.is_closing():
conn.writer.write(f"\r\n{response}\r\n".encode()) data = f"\r\n{response}\r\n".encode()
log.debug("send %d bytes to %s: %r", len(data), conn.address, data)
conn.writer.write(data)
await conn.writer.drain() await conn.writer.drain()
log.debug("HFP AG → %s: %s", conn.address, response) log.debug("HFP AG → %s: %s", conn.address, response)
@ -271,6 +321,8 @@ class HFPAudioGatewayProfile(ServiceInterface):
await self._handle_dial(conn, line) await self._handle_dial(conn, line)
elif cmd.startswith("AT+DTMF="): elif cmd.startswith("AT+DTMF="):
await self._handle_dtmf(conn, line) await self._handle_dtmf(conn, line)
elif cmd.startswith("AT+VTS="):
await self._handle_dtmf(conn, line)
# Volume # Volume
elif cmd.startswith("AT+VGS="): elif cmd.startswith("AT+VGS="):
@ -290,6 +342,14 @@ class HFPAudioGatewayProfile(ServiceInterface):
elif cmd.startswith("AT+BVRA="): elif cmd.startswith("AT+BVRA="):
await self._handle_bvra(conn, line) await self._handle_bvra(conn, line)
# Post-SLC configuration
elif cmd.startswith("AT+BIA="):
await self._handle_bia(conn, line)
elif cmd.startswith("AT+CCWA="):
await self._handle_ccwa(conn, line)
elif cmd.startswith("AT+CLIP="):
await self._handle_clip(conn, line)
# Misc # Misc
elif cmd == "AT+CMEE=1": elif cmd == "AT+CMEE=1":
await self._send_ok(conn) await self._send_ok(conn)
@ -348,12 +408,15 @@ class HFPAudioGatewayProfile(ServiceInterface):
await self._send_ok(conn) await self._send_ok(conn)
async def _handle_bac(self, conn: HFPConnection, line: str) -> None: async def _handle_bac(self, conn: HFPConnection, line: str) -> None:
"""AT+BAC=<codec_ids> — available codecs from HF.""" """AT+BAC=<codec_ids> — available codecs from HF.
# 1=CVSD, 2=mSBC
Just acknowledge and store codec availability. Codec selection (+BCS)
happens after SLC is established, not during setup sending it here
confuses some Bluedroid implementations.
"""
# 1=CVSD, 2=mSBC — store for later codec negotiation
conn._hf_supports_msbc = "2" in line
await self._send_ok(conn) await self._send_ok(conn)
# If WBS supported, select mSBC (codec 2)
if "2" in line:
await self._send(conn, "+BCS: 2")
async def _handle_bcs(self, conn: HFPConnection, line: str) -> None: async def _handle_bcs(self, conn: HFPConnection, line: str) -> None:
"""AT+BCS=<codec_id> — codec confirmation from HF.""" """AT+BCS=<codec_id> — codec confirmation from HF."""
@ -431,7 +494,7 @@ class HFPAudioGatewayProfile(ServiceInterface):
}) })
async def _handle_dtmf(self, conn: HFPConnection, line: str) -> None: async def _handle_dtmf(self, conn: HFPConnection, line: str) -> None:
"""AT+DTMF=<code> — HF sends DTMF tone.""" """AT+VTS=<code> / AT+DTMF=<code> — HF sends DTMF tone."""
try: try:
code = line.split("=")[1].strip() code = line.split("=")[1].strip()
except IndexError: except IndexError:
@ -529,13 +592,54 @@ class HFPAudioGatewayProfile(ServiceInterface):
await self._send(conn, "+XAPL=iPhone,7") await self._send(conn, "+XAPL=iPhone,7")
await self._send_ok(conn) await self._send_ok(conn)
# ---- Post-SLC configuration handlers ----
async def _handle_bia(self, conn: HFPConnection, line: str) -> None:
"""AT+BIA=<indrep1>,<indrep2>,... — Bluetooth Indicators Activation.
Each parameter is 0 (deactivate) or 1 (activate) for the corresponding
CIND indicator by position. Controls which +CIEV updates the HF wants.
"""
try:
params = line.split("=", 1)[1].strip()
flags = [v.strip() == "1" for v in params.split(",")]
except (IndexError, ValueError):
flags = []
conn._indicator_active = flags
await self._send_ok(conn)
async def _handle_ccwa(self, conn: HFPConnection, line: str) -> None:
"""AT+CCWA=<n> — Call Waiting Notification enable/disable."""
try:
conn._ccwa_enabled = line.split("=")[1].strip() == "1"
except IndexError:
conn._ccwa_enabled = False
await self._send_ok(conn)
async def _handle_clip(self, conn: HFPConnection, line: str) -> None:
"""AT+CLIP=<n> — Calling Line Identification Presentation enable/disable."""
try:
conn._clip_enabled = line.split("=")[1].strip() == "1"
except IndexError:
conn._clip_enabled = False
await self._send_ok(conn)
# ==================== AG-initiated actions ==================== # ==================== AG-initiated actions ====================
async def _update_indicator(self, conn: HFPConnection, index: int, value: int) -> None: async def _update_indicator(self, conn: HFPConnection, index: int, value: int) -> None:
"""Send +CIEV indicator update to HF (1-based index).""" """Send +CIEV indicator update to HF (1-based index).
Respects AT+BIA activation flags if the HF deactivated this indicator,
we still store the value but don't send +CIEV over the air.
"""
if 1 <= index <= len(self.indicator_values): if 1 <= index <= len(self.indicator_values):
self.indicator_values[index - 1] = value self.indicator_values[index - 1] = value
if conn.slc_established: if conn.slc_established:
# Check AT+BIA flags (0-indexed); if no flags set, send all
bia_idx = index - 1
if conn._indicator_active and bia_idx < len(conn._indicator_active):
if not conn._indicator_active[bia_idx]:
return # HF doesn't want this indicator
await self._send(conn, f"+CIEV: {index},{value}") await self._send(conn, f"+CIEV: {index},{value}")
async def simulate_incoming_call( async def simulate_incoming_call(
@ -566,6 +670,7 @@ class HFPAudioGatewayProfile(ServiceInterface):
try: try:
while call.state == CallState.INCOMING: while call.state == CallState.INCOMING:
await self._send(conn, "RING") await self._send(conn, "RING")
if conn._clip_enabled:
await self._send(conn, f'+CLIP: "{number}",{number_type}') await self._send(conn, f'+CLIP: "{number}",{number_type}')
await asyncio.sleep(3.0) await asyncio.sleep(3.0)
except asyncio.CancelledError: except asyncio.CancelledError:
@ -581,12 +686,15 @@ class HFPAudioGatewayProfile(ServiceInterface):
return True return True
async def simulate_call_end(self, address: str) -> bool: async def simulate_call_end(self, address: str) -> bool:
"""End any active or ringing call from AG side.""" """End any active, ringing, or outgoing call from AG side."""
conn = self._get_connection(address) conn = self._get_connection(address)
if not conn: if not conn:
return False return False
active = [c for c in conn.calls if c.state in (CallState.ACTIVE, CallState.INCOMING)] active = [c for c in conn.calls if c.state in (
CallState.ACTIVE, CallState.INCOMING,
CallState.OUTGOING, CallState.ALERTING,
)]
if not active: if not active:
return False return False
@ -705,7 +813,12 @@ async def enable_hfp_ag() -> HFPAudioGatewayProfile:
_profile = HFPAudioGatewayProfile() _profile = HFPAudioGatewayProfile()
if _profile_bus is None: if _profile_bus is None:
_profile_bus = await MessageBus(bus_type=BusType.SYSTEM).connect() _profile_bus = await MessageBus(
bus_type=BusType.SYSTEM,
negotiate_unix_fd=True, # Required: BlueZ passes RFCOMM fd via D-Bus
).connect()
log.debug("D-Bus connected: negotiate_unix_fd=%s unique_name=%s",
_profile_bus._negotiate_unix_fd, _profile_bus.unique_name)
_profile_bus.export(HFP_AG_PROFILE_PATH, _profile) _profile_bus.export(HFP_AG_PROFILE_PATH, _profile)
# Register with ProfileManager1 # Register with ProfileManager1
@ -731,8 +844,20 @@ async def enable_hfp_ag() -> HFPAudioGatewayProfile:
log.info("HFP AG profile registered with BlueZ") log.info("HFP AG profile registered with BlueZ")
except Exception as e: except Exception as e:
if "Already Exists" in str(e): if "Already Exists" in str(e):
# Stale registration from a previous process that didn't clean up.
# Unregister the orphaned profile, then re-register with our bus.
log.info("HFP AG profile stale — unregistering and re-registering")
try:
await profile_mgr.call_unregister_profile(HFP_AG_PROFILE_PATH)
except Exception:
pass
await profile_mgr.call_register_profile(
HFP_AG_PROFILE_PATH,
HFP_AG_UUID,
options,
)
_profile_registered = True _profile_registered = True
log.info("HFP AG profile was already registered") log.info("HFP AG profile re-registered with BlueZ")
else: else:
raise raise