- Add recursive unwrap_variant() to handle nested Variant objects
- Convert manufacturer_data/service_data bytes to hex strings in DeviceInfo
- Remove redundant bytes-to-hex conversion from resources.py
BlueZ returns ManufacturerData as {int: Variant(bytes)} which wasn't
being fully unwrapped, causing JSON serialization failures during scan.
607 lines
23 KiB
Python
607 lines
23 KiB
Python
"""BlueZ D-Bus client wrapper using dbus-fast.
|
|
|
|
This module provides an async interface to the BlueZ Bluetooth stack via D-Bus.
|
|
BlueZ exposes its functionality through several D-Bus interfaces:
|
|
|
|
- org.bluez.Adapter1: Adapter control (power, discoverable, scanning)
|
|
- org.bluez.Device1: Device management (pair, connect, trust)
|
|
- org.bluez.GattService1: BLE GATT services
|
|
- org.bluez.GattCharacteristic1: BLE GATT characteristics
|
|
- org.bluez.Agent1: Pairing agent for PIN/confirmation handling
|
|
- org.bluez.AgentManager1: Agent registration
|
|
|
|
Object paths follow this pattern:
|
|
- /org/bluez/hci0 - Adapter
|
|
- /org/bluez/hci0/dev_XX_XX_XX_XX_XX_XX - Device
|
|
- /org/bluez/hci0/dev_XX_XX_XX_XX_XX_XX/serviceXXXX - GATT service
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
from dataclasses import dataclass, field
|
|
from typing import Any
|
|
|
|
from dbus_fast import BusType, Variant
|
|
from dbus_fast.aio import MessageBus, ProxyInterface
|
|
|
|
# BlueZ constants
|
|
BLUEZ_SERVICE = "org.bluez"
|
|
BLUEZ_ADAPTER_IFACE = "org.bluez.Adapter1"
|
|
BLUEZ_DEVICE_IFACE = "org.bluez.Device1"
|
|
BLUEZ_GATT_SERVICE_IFACE = "org.bluez.GattService1"
|
|
BLUEZ_GATT_CHAR_IFACE = "org.bluez.GattCharacteristic1"
|
|
BLUEZ_GATT_DESC_IFACE = "org.bluez.GattDescriptor1"
|
|
BLUEZ_BATTERY_IFACE = "org.bluez.Battery1"
|
|
BLUEZ_AGENT_IFACE = "org.bluez.Agent1"
|
|
BLUEZ_AGENT_MANAGER_IFACE = "org.bluez.AgentManager1"
|
|
|
|
DBUS_PROPS_IFACE = "org.freedesktop.DBus.Properties"
|
|
DBUS_OBJMANAGER_IFACE = "org.freedesktop.DBus.ObjectManager"
|
|
|
|
|
|
def unwrap_variant(value: Any) -> Any:
|
|
"""Recursively unwrap dbus-fast Variant objects to plain Python types.
|
|
|
|
BlueZ returns nested structures like ManufacturerData = {int: Variant(bytes)}.
|
|
This function ensures all Variant wrappers are removed at every level.
|
|
"""
|
|
if isinstance(value, Variant):
|
|
return unwrap_variant(value.value)
|
|
elif isinstance(value, dict):
|
|
return {k: unwrap_variant(v) for k, v in value.items()}
|
|
elif isinstance(value, list):
|
|
return [unwrap_variant(v) for v in value]
|
|
return value
|
|
|
|
|
|
def address_to_path(adapter: str, address: str) -> str:
|
|
"""Convert a Bluetooth address to D-Bus object path."""
|
|
addr_path = address.upper().replace(":", "_")
|
|
return f"/org/bluez/{adapter}/dev_{addr_path}"
|
|
|
|
|
|
def path_to_address(path: str) -> str:
|
|
"""Extract Bluetooth address from D-Bus object path."""
|
|
# /org/bluez/hci0/dev_XX_XX_XX_XX_XX_XX -> XX:XX:XX:XX:XX:XX
|
|
parts = path.split("/")
|
|
if len(parts) >= 5 and parts[-1].startswith("dev_"):
|
|
return parts[-1][4:].replace("_", ":")
|
|
return ""
|
|
|
|
|
|
@dataclass
|
|
class AdapterInfo:
|
|
"""Information about a Bluetooth adapter."""
|
|
|
|
path: str
|
|
name: str # e.g., "hci0"
|
|
address: str
|
|
alias: str
|
|
powered: bool
|
|
discoverable: bool
|
|
discoverable_timeout: int
|
|
pairable: bool
|
|
pairable_timeout: int
|
|
discovering: bool
|
|
uuids: list[str] = field(default_factory=list)
|
|
modalias: str = ""
|
|
device_class: int = 0
|
|
|
|
|
|
@dataclass
|
|
class DeviceInfo:
|
|
"""Information about a Bluetooth device."""
|
|
|
|
path: str
|
|
adapter: str
|
|
address: str
|
|
name: str
|
|
alias: str
|
|
paired: bool
|
|
bonded: bool
|
|
trusted: bool
|
|
blocked: bool
|
|
connected: bool
|
|
rssi: int | None
|
|
tx_power: int | None
|
|
uuids: list[str] = field(default_factory=list)
|
|
device_class: int = 0
|
|
appearance: int = 0
|
|
icon: str = ""
|
|
services_resolved: bool = False
|
|
manufacturer_data: dict[int, str] = field(default_factory=dict) # hex strings
|
|
service_data: dict[str, str] = field(default_factory=dict) # hex strings
|
|
|
|
|
|
@dataclass
|
|
class GattService:
|
|
"""Information about a GATT service."""
|
|
|
|
path: str
|
|
uuid: str
|
|
primary: bool
|
|
device_path: str
|
|
|
|
|
|
@dataclass
|
|
class GattCharacteristic:
|
|
"""Information about a GATT characteristic."""
|
|
|
|
path: str
|
|
uuid: str
|
|
service_path: str
|
|
flags: list[str] = field(default_factory=list)
|
|
notifying: bool = False
|
|
|
|
|
|
class BlueZClient:
|
|
"""Async client for BlueZ D-Bus API."""
|
|
|
|
def __init__(self):
|
|
self._bus: MessageBus | None = None
|
|
self._object_manager: ProxyInterface | None = None
|
|
self._objects_cache: dict[str, dict[str, dict[str, Any]]] = {}
|
|
|
|
async def connect(self) -> None:
|
|
"""Connect to the system D-Bus."""
|
|
if self._bus is not None:
|
|
return
|
|
|
|
self._bus = await MessageBus(bus_type=BusType.SYSTEM).connect()
|
|
|
|
# Get the ObjectManager for BlueZ
|
|
introspection = await self._bus.introspect(BLUEZ_SERVICE, "/")
|
|
proxy = self._bus.get_proxy_object(BLUEZ_SERVICE, "/", introspection)
|
|
self._object_manager = proxy.get_interface(DBUS_OBJMANAGER_IFACE)
|
|
|
|
async def disconnect(self) -> None:
|
|
"""Disconnect from D-Bus."""
|
|
if self._bus:
|
|
self._bus.disconnect()
|
|
self._bus = None
|
|
self._object_manager = None
|
|
|
|
async def _ensure_connected(self) -> None:
|
|
"""Ensure we're connected to D-Bus."""
|
|
if self._bus is None:
|
|
await self.connect()
|
|
|
|
async def _get_managed_objects(self) -> dict[str, dict[str, dict[str, Any]]]:
|
|
"""Get all managed objects from BlueZ."""
|
|
await self._ensure_connected()
|
|
assert self._object_manager is not None
|
|
|
|
objects = await self._object_manager.call_get_managed_objects()
|
|
# Convert Variant values to Python types
|
|
result = {}
|
|
for path, interfaces in objects.items():
|
|
result[path] = {}
|
|
for iface, props in interfaces.items():
|
|
result[path][iface] = {
|
|
k: unwrap_variant(v) for k, v in props.items()
|
|
}
|
|
return result
|
|
|
|
async def _get_interface(self, path: str, interface: str) -> ProxyInterface:
|
|
"""Get a proxy interface for an object."""
|
|
await self._ensure_connected()
|
|
assert self._bus is not None
|
|
|
|
introspection = await self._bus.introspect(BLUEZ_SERVICE, path)
|
|
proxy = self._bus.get_proxy_object(BLUEZ_SERVICE, path, introspection)
|
|
return proxy.get_interface(interface)
|
|
|
|
async def _get_property(self, path: str, interface: str, prop: str) -> Any:
|
|
"""Get a single property from an object."""
|
|
props_iface = await self._get_interface(path, DBUS_PROPS_IFACE)
|
|
result = await props_iface.call_get(interface, prop)
|
|
return result.value if isinstance(result, Variant) else result
|
|
|
|
async def _set_property(self, path: str, interface: str, prop: str, value: Any) -> None:
|
|
"""Set a single property on an object."""
|
|
props_iface = await self._get_interface(path, DBUS_PROPS_IFACE)
|
|
# Wrap value in appropriate Variant
|
|
if isinstance(value, bool):
|
|
variant = Variant("b", value)
|
|
elif isinstance(value, int):
|
|
variant = Variant("u", value)
|
|
elif isinstance(value, str):
|
|
variant = Variant("s", value)
|
|
else:
|
|
variant = Variant("v", value)
|
|
await props_iface.call_set(interface, prop, variant)
|
|
|
|
async def _get_all_properties(self, path: str, interface: str) -> dict[str, Any]:
|
|
"""Get all properties from an object."""
|
|
props_iface = await self._get_interface(path, DBUS_PROPS_IFACE)
|
|
props = await props_iface.call_get_all(interface)
|
|
return {k: unwrap_variant(v) for k, v in props.items()}
|
|
|
|
# ==================== Adapter Operations ====================
|
|
|
|
async def list_adapters(self) -> list[AdapterInfo]:
|
|
"""List all Bluetooth adapters."""
|
|
objects = await self._get_managed_objects()
|
|
adapters = []
|
|
|
|
for path, interfaces in objects.items():
|
|
if BLUEZ_ADAPTER_IFACE in interfaces:
|
|
props = interfaces[BLUEZ_ADAPTER_IFACE]
|
|
# Extract adapter name from path: /org/bluez/hci0 -> hci0
|
|
name = path.split("/")[-1]
|
|
adapters.append(
|
|
AdapterInfo(
|
|
path=path,
|
|
name=name,
|
|
address=props.get("Address", ""),
|
|
alias=props.get("Alias", ""),
|
|
powered=props.get("Powered", False),
|
|
discoverable=props.get("Discoverable", False),
|
|
discoverable_timeout=props.get("DiscoverableTimeout", 0),
|
|
pairable=props.get("Pairable", False),
|
|
pairable_timeout=props.get("PairableTimeout", 0),
|
|
discovering=props.get("Discovering", False),
|
|
uuids=props.get("UUIDs", []),
|
|
modalias=props.get("Modalias", ""),
|
|
device_class=props.get("Class", 0),
|
|
)
|
|
)
|
|
|
|
return adapters
|
|
|
|
async def get_adapter(self, adapter: str) -> AdapterInfo | None:
|
|
"""Get info for a specific adapter."""
|
|
adapters = await self.list_adapters()
|
|
for a in adapters:
|
|
if a.name == adapter:
|
|
return a
|
|
return None
|
|
|
|
async def set_adapter_power(self, adapter: str, powered: bool) -> None:
|
|
"""Set adapter power state."""
|
|
path = f"/org/bluez/{adapter}"
|
|
await self._set_property(path, BLUEZ_ADAPTER_IFACE, "Powered", powered)
|
|
|
|
async def set_adapter_discoverable(
|
|
self, adapter: str, discoverable: bool, timeout: int = 0
|
|
) -> None:
|
|
"""Set adapter discoverable state."""
|
|
path = f"/org/bluez/{adapter}"
|
|
if timeout > 0:
|
|
await self._set_property(
|
|
path, BLUEZ_ADAPTER_IFACE, "DiscoverableTimeout", timeout
|
|
)
|
|
await self._set_property(path, BLUEZ_ADAPTER_IFACE, "Discoverable", discoverable)
|
|
|
|
async def set_adapter_pairable(self, adapter: str, pairable: bool, timeout: int = 0) -> None:
|
|
"""Set adapter pairable state."""
|
|
path = f"/org/bluez/{adapter}"
|
|
if timeout > 0:
|
|
await self._set_property(path, BLUEZ_ADAPTER_IFACE, "PairableTimeout", timeout)
|
|
await self._set_property(path, BLUEZ_ADAPTER_IFACE, "Pairable", pairable)
|
|
|
|
async def set_adapter_alias(self, adapter: str, alias: str) -> None:
|
|
"""Set adapter friendly name."""
|
|
path = f"/org/bluez/{adapter}"
|
|
await self._set_property(path, BLUEZ_ADAPTER_IFACE, "Alias", alias)
|
|
|
|
# ==================== Discovery Operations ====================
|
|
|
|
async def start_discovery(self, adapter: str) -> None:
|
|
"""Start device discovery on an adapter."""
|
|
path = f"/org/bluez/{adapter}"
|
|
iface = await self._get_interface(path, BLUEZ_ADAPTER_IFACE)
|
|
await iface.call_start_discovery()
|
|
|
|
async def stop_discovery(self, adapter: str) -> None:
|
|
"""Stop device discovery on an adapter."""
|
|
path = f"/org/bluez/{adapter}"
|
|
iface = await self._get_interface(path, BLUEZ_ADAPTER_IFACE)
|
|
await iface.call_stop_discovery()
|
|
|
|
async def set_discovery_filter(
|
|
self,
|
|
adapter: str,
|
|
uuids: list[str] | None = None,
|
|
rssi: int | None = None,
|
|
transport: str = "auto",
|
|
duplicate_data: bool = False,
|
|
) -> None:
|
|
"""Set discovery filter for BLE scanning.
|
|
|
|
Args:
|
|
adapter: Adapter name (e.g., "hci0")
|
|
uuids: List of service UUIDs to filter by
|
|
rssi: Minimum RSSI threshold
|
|
transport: "auto", "bredr", or "le"
|
|
duplicate_data: Report duplicate advertisements
|
|
"""
|
|
path = f"/org/bluez/{adapter}"
|
|
iface = await self._get_interface(path, BLUEZ_ADAPTER_IFACE)
|
|
|
|
filter_dict: dict[str, Variant] = {}
|
|
if uuids:
|
|
filter_dict["UUIDs"] = Variant("as", uuids)
|
|
if rssi is not None:
|
|
filter_dict["RSSI"] = Variant("n", rssi)
|
|
filter_dict["Transport"] = Variant("s", transport)
|
|
filter_dict["DuplicateData"] = Variant("b", duplicate_data)
|
|
|
|
await iface.call_set_discovery_filter(filter_dict)
|
|
|
|
async def remove_discovery_filter(self, adapter: str) -> None:
|
|
"""Remove discovery filter."""
|
|
path = f"/org/bluez/{adapter}"
|
|
iface = await self._get_interface(path, BLUEZ_ADAPTER_IFACE)
|
|
# Set empty filter to clear
|
|
await iface.call_set_discovery_filter({})
|
|
|
|
# ==================== Device Operations ====================
|
|
|
|
async def list_devices(
|
|
self,
|
|
adapter: str | None = None,
|
|
filter_type: str = "all",
|
|
) -> list[DeviceInfo]:
|
|
"""List devices, optionally filtered.
|
|
|
|
Args:
|
|
adapter: Filter to devices on this adapter, or None for all
|
|
filter_type: "all", "paired", "connected", or "trusted"
|
|
"""
|
|
objects = await self._get_managed_objects()
|
|
devices = []
|
|
|
|
for path, interfaces in objects.items():
|
|
if BLUEZ_DEVICE_IFACE not in interfaces:
|
|
continue
|
|
|
|
# Filter by adapter
|
|
if adapter and not path.startswith(f"/org/bluez/{adapter}/"):
|
|
continue
|
|
|
|
props = interfaces[BLUEZ_DEVICE_IFACE]
|
|
|
|
# Apply filter
|
|
if filter_type == "paired" and not props.get("Paired", False):
|
|
continue
|
|
if filter_type == "connected" and not props.get("Connected", False):
|
|
continue
|
|
if filter_type == "trusted" and not props.get("Trusted", False):
|
|
continue
|
|
|
|
# Extract adapter name from path
|
|
adapter_name = path.split("/")[3] if len(path.split("/")) > 3 else ""
|
|
|
|
# Parse manufacturer data (convert bytes to hex strings for JSON)
|
|
mfr_data = {}
|
|
if "ManufacturerData" in props:
|
|
for k, v in props["ManufacturerData"].items():
|
|
raw = bytes(v) if isinstance(v, (list, bytearray)) else v
|
|
mfr_data[k] = raw.hex() if isinstance(raw, bytes) else str(raw)
|
|
|
|
# Parse service data (convert bytes to hex strings for JSON)
|
|
svc_data = {}
|
|
if "ServiceData" in props:
|
|
for k, v in props["ServiceData"].items():
|
|
raw = bytes(v) if isinstance(v, (list, bytearray)) else v
|
|
svc_data[k] = raw.hex() if isinstance(raw, bytes) else str(raw)
|
|
|
|
devices.append(
|
|
DeviceInfo(
|
|
path=path,
|
|
adapter=adapter_name,
|
|
address=props.get("Address", ""),
|
|
name=props.get("Name", ""),
|
|
alias=props.get("Alias", ""),
|
|
paired=props.get("Paired", False),
|
|
bonded=props.get("Bonded", False),
|
|
trusted=props.get("Trusted", False),
|
|
blocked=props.get("Blocked", False),
|
|
connected=props.get("Connected", False),
|
|
rssi=props.get("RSSI"),
|
|
tx_power=props.get("TxPower"),
|
|
uuids=props.get("UUIDs", []),
|
|
device_class=props.get("Class", 0),
|
|
appearance=props.get("Appearance", 0),
|
|
icon=props.get("Icon", ""),
|
|
services_resolved=props.get("ServicesResolved", False),
|
|
manufacturer_data=mfr_data,
|
|
service_data=svc_data,
|
|
)
|
|
)
|
|
|
|
return devices
|
|
|
|
async def get_device(self, adapter: str, address: str) -> DeviceInfo | None:
|
|
"""Get info for a specific device."""
|
|
devices = await self.list_devices(adapter=adapter)
|
|
for d in devices:
|
|
if d.address.upper() == address.upper():
|
|
return d
|
|
return None
|
|
|
|
async def pair_device(self, adapter: str, address: str) -> None:
|
|
"""Initiate pairing with a device.
|
|
|
|
Note: This may require an agent for PIN entry or confirmation.
|
|
"""
|
|
path = address_to_path(adapter, address)
|
|
iface = await self._get_interface(path, BLUEZ_DEVICE_IFACE)
|
|
await iface.call_pair()
|
|
|
|
async def cancel_pairing(self, adapter: str, address: str) -> None:
|
|
"""Cancel ongoing pairing."""
|
|
path = address_to_path(adapter, address)
|
|
iface = await self._get_interface(path, BLUEZ_DEVICE_IFACE)
|
|
await iface.call_cancel_pairing()
|
|
|
|
async def connect_device(self, adapter: str, address: str) -> None:
|
|
"""Connect to a device."""
|
|
path = address_to_path(adapter, address)
|
|
iface = await self._get_interface(path, BLUEZ_DEVICE_IFACE)
|
|
await iface.call_connect()
|
|
|
|
async def disconnect_device(self, adapter: str, address: str) -> None:
|
|
"""Disconnect from a device."""
|
|
path = address_to_path(adapter, address)
|
|
iface = await self._get_interface(path, BLUEZ_DEVICE_IFACE)
|
|
await iface.call_disconnect()
|
|
|
|
async def remove_device(self, adapter: str, address: str) -> None:
|
|
"""Remove a device from the known devices list."""
|
|
device_path = address_to_path(adapter, address)
|
|
adapter_path = f"/org/bluez/{adapter}"
|
|
iface = await self._get_interface(adapter_path, BLUEZ_ADAPTER_IFACE)
|
|
await iface.call_remove_device(device_path)
|
|
|
|
async def set_device_trusted(self, adapter: str, address: str, trusted: bool) -> None:
|
|
"""Set device trusted state."""
|
|
path = address_to_path(adapter, address)
|
|
await self._set_property(path, BLUEZ_DEVICE_IFACE, "Trusted", trusted)
|
|
|
|
async def set_device_blocked(self, adapter: str, address: str, blocked: bool) -> None:
|
|
"""Set device blocked state."""
|
|
path = address_to_path(adapter, address)
|
|
await self._set_property(path, BLUEZ_DEVICE_IFACE, "Blocked", blocked)
|
|
|
|
async def set_device_alias(self, adapter: str, address: str, alias: str) -> None:
|
|
"""Set device friendly name."""
|
|
path = address_to_path(adapter, address)
|
|
await self._set_property(path, BLUEZ_DEVICE_IFACE, "Alias", alias)
|
|
|
|
# ==================== Profile Operations ====================
|
|
|
|
async def connect_profile(self, adapter: str, address: str, uuid: str) -> None:
|
|
"""Connect a specific profile."""
|
|
path = address_to_path(adapter, address)
|
|
iface = await self._get_interface(path, BLUEZ_DEVICE_IFACE)
|
|
await iface.call_connect_profile(uuid)
|
|
|
|
async def disconnect_profile(self, adapter: str, address: str, uuid: str) -> None:
|
|
"""Disconnect a specific profile."""
|
|
path = address_to_path(adapter, address)
|
|
iface = await self._get_interface(path, BLUEZ_DEVICE_IFACE)
|
|
await iface.call_disconnect_profile(uuid)
|
|
|
|
# ==================== GATT Operations ====================
|
|
|
|
async def list_gatt_services(self, adapter: str, address: str) -> list[GattService]:
|
|
"""List GATT services for a device."""
|
|
device_path = address_to_path(adapter, address)
|
|
objects = await self._get_managed_objects()
|
|
services = []
|
|
|
|
for path, interfaces in objects.items():
|
|
if not path.startswith(device_path + "/"):
|
|
continue
|
|
if BLUEZ_GATT_SERVICE_IFACE not in interfaces:
|
|
continue
|
|
|
|
props = interfaces[BLUEZ_GATT_SERVICE_IFACE]
|
|
services.append(
|
|
GattService(
|
|
path=path,
|
|
uuid=props.get("UUID", ""),
|
|
primary=props.get("Primary", True),
|
|
device_path=props.get("Device", device_path),
|
|
)
|
|
)
|
|
|
|
return services
|
|
|
|
async def list_gatt_characteristics(
|
|
self, adapter: str, address: str, service_uuid: str | None = None
|
|
) -> list[GattCharacteristic]:
|
|
"""List GATT characteristics for a device, optionally filtered by service."""
|
|
device_path = address_to_path(adapter, address)
|
|
objects = await self._get_managed_objects()
|
|
|
|
# First find services if filtering
|
|
service_paths = set()
|
|
if service_uuid:
|
|
for path, interfaces in objects.items():
|
|
if BLUEZ_GATT_SERVICE_IFACE in interfaces:
|
|
if interfaces[BLUEZ_GATT_SERVICE_IFACE].get("UUID", "").lower() == service_uuid.lower():
|
|
service_paths.add(path)
|
|
|
|
characteristics = []
|
|
for path, interfaces in objects.items():
|
|
if not path.startswith(device_path + "/"):
|
|
continue
|
|
if BLUEZ_GATT_CHAR_IFACE not in interfaces:
|
|
continue
|
|
|
|
props = interfaces[BLUEZ_GATT_CHAR_IFACE]
|
|
service_path = props.get("Service", "")
|
|
|
|
# Filter by service if specified
|
|
if service_uuid and service_path not in service_paths:
|
|
continue
|
|
|
|
characteristics.append(
|
|
GattCharacteristic(
|
|
path=path,
|
|
uuid=props.get("UUID", ""),
|
|
service_path=service_path,
|
|
flags=props.get("Flags", []),
|
|
notifying=props.get("Notifying", False),
|
|
)
|
|
)
|
|
|
|
return characteristics
|
|
|
|
async def read_characteristic(self, char_path: str) -> bytes:
|
|
"""Read a GATT characteristic value."""
|
|
iface = await self._get_interface(char_path, BLUEZ_GATT_CHAR_IFACE)
|
|
result = await iface.call_read_value({})
|
|
return bytes(result)
|
|
|
|
async def write_characteristic(
|
|
self, char_path: str, value: bytes, write_type: str = "request"
|
|
) -> None:
|
|
"""Write a GATT characteristic value.
|
|
|
|
Args:
|
|
char_path: D-Bus path of the characteristic
|
|
value: Bytes to write
|
|
write_type: "request" (with response) or "command" (without response)
|
|
"""
|
|
iface = await self._get_interface(char_path, BLUEZ_GATT_CHAR_IFACE)
|
|
options = {"type": Variant("s", write_type)}
|
|
await iface.call_write_value(list(value), options)
|
|
|
|
async def start_notify(self, char_path: str) -> None:
|
|
"""Start notifications for a characteristic."""
|
|
iface = await self._get_interface(char_path, BLUEZ_GATT_CHAR_IFACE)
|
|
await iface.call_start_notify()
|
|
|
|
async def stop_notify(self, char_path: str) -> None:
|
|
"""Stop notifications for a characteristic."""
|
|
iface = await self._get_interface(char_path, BLUEZ_GATT_CHAR_IFACE)
|
|
await iface.call_stop_notify()
|
|
|
|
# ==================== Battery Operations ====================
|
|
|
|
async def get_battery_level(self, adapter: str, address: str) -> int | None:
|
|
"""Get battery level from Battery1 interface if available."""
|
|
path = address_to_path(adapter, address)
|
|
objects = await self._get_managed_objects()
|
|
|
|
if path in objects and BLUEZ_BATTERY_IFACE in objects[path]:
|
|
return objects[path][BLUEZ_BATTERY_IFACE].get("Percentage")
|
|
return None
|
|
|
|
|
|
# Global client instance
|
|
_client: BlueZClient | None = None
|
|
|
|
|
|
async def get_client() -> BlueZClient:
|
|
"""Get or create the global BlueZ client."""
|
|
global _client
|
|
if _client is None:
|
|
_client = BlueZClient()
|
|
await _client.connect()
|
|
return _client
|