Fix JSON serialization for D-Bus advertisement data
- Add recursive unwrap_variant() to handle nested Variant objects
- Convert manufacturer_data/service_data bytes to hex strings in DeviceInfo
- Remove redundant bytes-to-hex conversion from resources.py
BlueZ returns ManufacturerData as {int: Variant(bytes)} which wasn't
being fully unwrapped, causing JSON serialization failures during scan.
This commit is contained in:
parent
61e424ab40
commit
3e3d77068b
1
.gitignore
vendored
1
.gitignore
vendored
@ -44,3 +44,4 @@ uv.lock
|
||||
|
||||
# Project specific
|
||||
*.log
|
||||
.mcp.json
|
||||
|
||||
@ -39,6 +39,21 @@ DBUS_PROPS_IFACE = "org.freedesktop.DBus.Properties"
|
||||
DBUS_OBJMANAGER_IFACE = "org.freedesktop.DBus.ObjectManager"
|
||||
|
||||
|
||||
def unwrap_variant(value: Any) -> Any:
|
||||
"""Recursively unwrap dbus-fast Variant objects to plain Python types.
|
||||
|
||||
BlueZ returns nested structures like ManufacturerData = {int: Variant(bytes)}.
|
||||
This function ensures all Variant wrappers are removed at every level.
|
||||
"""
|
||||
if isinstance(value, Variant):
|
||||
return unwrap_variant(value.value)
|
||||
elif isinstance(value, dict):
|
||||
return {k: unwrap_variant(v) for k, v in value.items()}
|
||||
elif isinstance(value, list):
|
||||
return [unwrap_variant(v) for v in value]
|
||||
return value
|
||||
|
||||
|
||||
def address_to_path(adapter: str, address: str) -> str:
|
||||
"""Convert a Bluetooth address to D-Bus object path."""
|
||||
addr_path = address.upper().replace(":", "_")
|
||||
@ -94,8 +109,8 @@ class DeviceInfo:
|
||||
appearance: int = 0
|
||||
icon: str = ""
|
||||
services_resolved: bool = False
|
||||
manufacturer_data: dict[int, bytes] = field(default_factory=dict)
|
||||
service_data: dict[str, bytes] = field(default_factory=dict)
|
||||
manufacturer_data: dict[int, str] = field(default_factory=dict) # hex strings
|
||||
service_data: dict[str, str] = field(default_factory=dict) # hex strings
|
||||
|
||||
|
||||
@dataclass
|
||||
@ -163,7 +178,7 @@ class BlueZClient:
|
||||
result[path] = {}
|
||||
for iface, props in interfaces.items():
|
||||
result[path][iface] = {
|
||||
k: v.value if isinstance(v, Variant) else v for k, v in props.items()
|
||||
k: unwrap_variant(v) for k, v in props.items()
|
||||
}
|
||||
return result
|
||||
|
||||
@ -200,7 +215,7 @@ class BlueZClient:
|
||||
"""Get all properties from an object."""
|
||||
props_iface = await self._get_interface(path, DBUS_PROPS_IFACE)
|
||||
props = await props_iface.call_get_all(interface)
|
||||
return {k: v.value if isinstance(v, Variant) else v for k, v in props.items()}
|
||||
return {k: unwrap_variant(v) for k, v in props.items()}
|
||||
|
||||
# ==================== Adapter Operations ====================
|
||||
|
||||
@ -358,17 +373,19 @@ class BlueZClient:
|
||||
# Extract adapter name from path
|
||||
adapter_name = path.split("/")[3] if len(path.split("/")) > 3 else ""
|
||||
|
||||
# Parse manufacturer data
|
||||
# Parse manufacturer data (convert bytes to hex strings for JSON)
|
||||
mfr_data = {}
|
||||
if "ManufacturerData" in props:
|
||||
for k, v in props["ManufacturerData"].items():
|
||||
mfr_data[k] = bytes(v) if isinstance(v, (list, bytearray)) else v
|
||||
raw = bytes(v) if isinstance(v, (list, bytearray)) else v
|
||||
mfr_data[k] = raw.hex() if isinstance(raw, bytes) else str(raw)
|
||||
|
||||
# Parse service data
|
||||
# Parse service data (convert bytes to hex strings for JSON)
|
||||
svc_data = {}
|
||||
if "ServiceData" in props:
|
||||
for k, v in props["ServiceData"].items():
|
||||
svc_data[k] = bytes(v) if isinstance(v, (list, bytearray)) else v
|
||||
raw = bytes(v) if isinstance(v, (list, bytearray)) else v
|
||||
svc_data[k] = raw.hex() if isinstance(raw, bytes) else str(raw)
|
||||
|
||||
devices.append(
|
||||
DeviceInfo(
|
||||
|
||||
@ -193,17 +193,6 @@ def register_resources(mcp: FastMCP) -> None:
|
||||
devices = await client.list_devices()
|
||||
for d in devices:
|
||||
if d.address.upper() == address.upper():
|
||||
data = asdict(d)
|
||||
# Convert bytes to hex strings for JSON serialization
|
||||
if data.get("manufacturer_data"):
|
||||
data["manufacturer_data"] = {
|
||||
k: v.hex() if isinstance(v, bytes) else str(v)
|
||||
for k, v in data["manufacturer_data"].items()
|
||||
}
|
||||
if data.get("service_data"):
|
||||
data["service_data"] = {
|
||||
k: v.hex() if isinstance(v, bytes) else str(v)
|
||||
for k, v in data["service_data"].items()
|
||||
}
|
||||
return json.dumps(data, indent=2)
|
||||
# DeviceInfo already has manufacturer_data/service_data as hex strings
|
||||
return json.dumps(asdict(d), indent=2)
|
||||
return json.dumps({"error": f"Device '{address}' not found"})
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user