Add BlueZ Agent1 implementation for pairing
Implements the org.bluez.Agent1 D-Bus interface to handle Bluetooth pairing operations with three modes: - elicit: MCP elicitation for PIN/confirmation (if client supports) - interactive: Returns pending status for bt_pair_confirm calls - auto: Auto-accepts pairings (for trusted environments) Changes: - New agent.py with BlueZAgent ServiceInterface - Updated bt_pair to use agent with configurable timeout - Updated bt_pair_confirm to respond to pending agent requests - Added bt_pairing_status tool to check pending requests - Removed PEP 563 future import from monitor.py for FastMCP compat
This commit is contained in:
parent
cd03fa9253
commit
7f3b096c83
388
src/mcbluetooth/agent.py
Normal file
388
src/mcbluetooth/agent.py
Normal file
@ -0,0 +1,388 @@
|
||||
"""BlueZ pairing agent implementation.
|
||||
|
||||
This module implements the org.bluez.Agent1 D-Bus interface for handling
|
||||
Bluetooth pairing operations. The agent supports multiple pairing modes:
|
||||
|
||||
- elicit: Use MCP elicitation to request PIN/confirmation from user (preferred)
|
||||
- interactive: Return pending status, wait for bt_pair_confirm tool call
|
||||
- auto: Auto-accept all pairings (use only in trusted environments)
|
||||
|
||||
The agent is registered with BlueZ's AgentManager1 and handles callbacks
|
||||
for PIN codes, passkeys, and confirmations during the pairing process.
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
from dataclasses import dataclass, field
|
||||
from datetime import datetime
|
||||
from enum import Enum
|
||||
from typing import Any
|
||||
|
||||
from dbus_fast import BusType, DBusError
|
||||
from dbus_fast.aio import MessageBus
|
||||
from dbus_fast.service import ServiceInterface, method
|
||||
|
||||
# Agent constants
|
||||
AGENT_PATH = "/mcbluetooth/agent"
|
||||
AGENT_CAPABILITY = "KeyboardDisplay" # Can display and enter PINs
|
||||
|
||||
BLUEZ_SERVICE = "org.bluez"
|
||||
AGENT_MANAGER_IFACE = "org.bluez.AgentManager1"
|
||||
|
||||
|
||||
class PairingRequestType(Enum):
|
||||
"""Types of pairing requests the agent can receive."""
|
||||
|
||||
PIN_CODE = "pin_code"
|
||||
PASSKEY = "passkey"
|
||||
CONFIRMATION = "confirmation"
|
||||
AUTHORIZATION = "authorization"
|
||||
SERVICE_AUTH = "service_authorization"
|
||||
|
||||
|
||||
class PairingMode(Enum):
|
||||
"""Pairing behavior modes."""
|
||||
|
||||
ELICIT = "elicit" # Use MCP elicitation
|
||||
INTERACTIVE = "interactive" # Wait for bt_pair_confirm
|
||||
AUTO = "auto" # Auto-accept everything
|
||||
|
||||
|
||||
@dataclass
|
||||
class PairingRequest:
|
||||
"""A pending pairing request from BlueZ."""
|
||||
|
||||
request_type: PairingRequestType
|
||||
device_path: str
|
||||
device_address: str
|
||||
passkey: int | None = None # For confirmation requests
|
||||
uuid: str | None = None # For service authorization
|
||||
timestamp: datetime = field(default_factory=datetime.now)
|
||||
response_event: asyncio.Event = field(default_factory=asyncio.Event)
|
||||
response_value: str | int | bool | None = None
|
||||
response_error: str | None = None
|
||||
|
||||
|
||||
class BlueZAgentError(DBusError):
|
||||
"""Base class for agent D-Bus errors."""
|
||||
|
||||
pass
|
||||
|
||||
|
||||
class Rejected(BlueZAgentError):
|
||||
"""Pairing was rejected by user."""
|
||||
|
||||
def __init__(self):
|
||||
super().__init__("org.bluez.Error.Rejected", "Pairing rejected")
|
||||
|
||||
|
||||
class Canceled(BlueZAgentError):
|
||||
"""Pairing was canceled."""
|
||||
|
||||
def __init__(self):
|
||||
super().__init__("org.bluez.Error.Canceled", "Pairing canceled")
|
||||
|
||||
|
||||
def path_to_address(device_path: str) -> str:
|
||||
"""Extract Bluetooth address from D-Bus object path."""
|
||||
# /org/bluez/hci0/dev_XX_XX_XX_XX_XX_XX -> XX:XX:XX:XX:XX:XX
|
||||
parts = device_path.split("/")
|
||||
if len(parts) >= 5 and parts[-1].startswith("dev_"):
|
||||
return parts[-1][4:].replace("_", ":")
|
||||
return device_path
|
||||
|
||||
|
||||
class BlueZAgent(ServiceInterface):
|
||||
"""D-Bus service implementing org.bluez.Agent1 interface.
|
||||
|
||||
This agent handles pairing requests from BlueZ and coordinates
|
||||
with the MCP tools for user interaction.
|
||||
"""
|
||||
|
||||
def __init__(self, mode: PairingMode = PairingMode.INTERACTIVE):
|
||||
super().__init__("org.bluez.Agent1")
|
||||
self.mode = mode
|
||||
self.pending_requests: dict[str, PairingRequest] = {} # keyed by device_path
|
||||
self._timeout = 60.0 # Seconds to wait for user response
|
||||
|
||||
def set_mode(self, mode: PairingMode) -> None:
|
||||
"""Change the pairing mode."""
|
||||
self.mode = mode
|
||||
|
||||
def get_pending_request(self, device_address: str) -> PairingRequest | None:
|
||||
"""Get a pending request by device address."""
|
||||
for req in self.pending_requests.values():
|
||||
if req.device_address.upper() == device_address.upper():
|
||||
return req
|
||||
return None
|
||||
|
||||
def respond_to_request(
|
||||
self,
|
||||
device_address: str,
|
||||
accept: bool,
|
||||
pin_or_passkey: str | int | None = None,
|
||||
) -> bool:
|
||||
"""Respond to a pending pairing request.
|
||||
|
||||
Returns True if a request was found and responded to.
|
||||
"""
|
||||
request = self.get_pending_request(device_address)
|
||||
if not request:
|
||||
return False
|
||||
|
||||
if accept:
|
||||
request.response_value = pin_or_passkey
|
||||
request.response_error = None
|
||||
else:
|
||||
request.response_error = "rejected"
|
||||
|
||||
request.response_event.set()
|
||||
return True
|
||||
|
||||
async def _wait_for_response(self, request: PairingRequest) -> Any:
|
||||
"""Wait for user response to a pairing request."""
|
||||
try:
|
||||
await asyncio.wait_for(
|
||||
request.response_event.wait(), timeout=self._timeout
|
||||
)
|
||||
except TimeoutError:
|
||||
request.response_error = "timeout"
|
||||
raise Canceled() from None
|
||||
finally:
|
||||
# Clean up
|
||||
if request.device_path in self.pending_requests:
|
||||
del self.pending_requests[request.device_path]
|
||||
|
||||
if request.response_error:
|
||||
raise Rejected()
|
||||
|
||||
return request.response_value
|
||||
|
||||
def _create_request(
|
||||
self,
|
||||
request_type: PairingRequestType,
|
||||
device_path: str,
|
||||
passkey: int | None = None,
|
||||
uuid: str | None = None,
|
||||
) -> PairingRequest:
|
||||
"""Create and store a new pairing request."""
|
||||
address = path_to_address(device_path)
|
||||
request = PairingRequest(
|
||||
request_type=request_type,
|
||||
device_path=device_path,
|
||||
device_address=address,
|
||||
passkey=passkey,
|
||||
uuid=uuid,
|
||||
)
|
||||
self.pending_requests[device_path] = request
|
||||
return request
|
||||
|
||||
# ==================== Agent1 Interface Methods ====================
|
||||
|
||||
@method()
|
||||
def Release(self) -> None:
|
||||
"""Called when the agent is unregistered."""
|
||||
# Clear any pending requests
|
||||
for req in self.pending_requests.values():
|
||||
req.response_error = "released"
|
||||
req.response_event.set()
|
||||
self.pending_requests.clear()
|
||||
|
||||
@method()
|
||||
async def RequestPinCode(self, device: "o") -> "s": # noqa: F821
|
||||
"""Request PIN code for pairing.
|
||||
|
||||
Legacy pairing method - returns a string PIN (usually 4-6 digits).
|
||||
"""
|
||||
if self.mode == PairingMode.AUTO:
|
||||
return "0000" # Default PIN for auto mode
|
||||
|
||||
request = self._create_request(PairingRequestType.PIN_CODE, device)
|
||||
|
||||
# For interactive/elicit, wait for response
|
||||
response = await self._wait_for_response(request)
|
||||
return str(response) if response else "0000"
|
||||
|
||||
@method()
|
||||
def DisplayPinCode(self, device: "o", pincode: "s") -> None: # noqa: F821
|
||||
"""Display PIN code to the user.
|
||||
|
||||
Called when the remote device generated the PIN.
|
||||
"""
|
||||
# Store as info - the MCP client can retrieve this
|
||||
self._create_request(PairingRequestType.PIN_CODE, device)
|
||||
self.pending_requests[device].response_value = pincode
|
||||
# For display, we don't wait - just inform
|
||||
# The MCP logging will show this
|
||||
|
||||
@method()
|
||||
async def RequestPasskey(self, device: "o") -> "u": # noqa: F821
|
||||
"""Request numeric passkey for pairing.
|
||||
|
||||
Returns a 6-digit numeric passkey (0-999999).
|
||||
"""
|
||||
if self.mode == PairingMode.AUTO:
|
||||
return 0 # Accept any passkey in auto mode
|
||||
|
||||
request = self._create_request(PairingRequestType.PASSKEY, device)
|
||||
|
||||
response = await self._wait_for_response(request)
|
||||
return int(response) if response else 0
|
||||
|
||||
@method()
|
||||
def DisplayPasskey(self, device: "o", passkey: "u", entered: "q") -> None: # noqa: F821
|
||||
"""Display passkey with progress indicator.
|
||||
|
||||
Called to show the passkey being entered on the remote device.
|
||||
"""
|
||||
# Store for display purposes
|
||||
if device not in self.pending_requests:
|
||||
self._create_request(PairingRequestType.PASSKEY, device, passkey=passkey)
|
||||
self.pending_requests[device].passkey = passkey
|
||||
# entered shows how many digits have been entered
|
||||
|
||||
@method()
|
||||
async def RequestConfirmation(self, device: "o", passkey: "u") -> None: # noqa: F821
|
||||
"""Request confirmation of a passkey.
|
||||
|
||||
User should confirm the displayed passkey matches the remote device.
|
||||
Raise Rejected() to reject, return normally to accept.
|
||||
"""
|
||||
if self.mode == PairingMode.AUTO:
|
||||
return # Auto-accept
|
||||
|
||||
request = self._create_request(
|
||||
PairingRequestType.CONFIRMATION, device, passkey=passkey
|
||||
)
|
||||
|
||||
await self._wait_for_response(request)
|
||||
# If we get here without exception, confirmation accepted
|
||||
|
||||
@method()
|
||||
async def RequestAuthorization(self, device: "o") -> None: # noqa: F821
|
||||
"""Request authorization for pairing.
|
||||
|
||||
Called when the remote device wants to pair without PIN.
|
||||
"""
|
||||
if self.mode == PairingMode.AUTO:
|
||||
return # Auto-accept
|
||||
|
||||
request = self._create_request(PairingRequestType.AUTHORIZATION, device)
|
||||
await self._wait_for_response(request)
|
||||
|
||||
@method()
|
||||
async def AuthorizeService(self, device: "o", uuid: "s") -> None: # noqa: F821
|
||||
"""Authorize a service connection.
|
||||
|
||||
Called when a device wants to connect to a specific service.
|
||||
"""
|
||||
if self.mode == PairingMode.AUTO:
|
||||
return # Auto-accept
|
||||
|
||||
request = self._create_request(
|
||||
PairingRequestType.SERVICE_AUTH, device, uuid=uuid
|
||||
)
|
||||
await self._wait_for_response(request)
|
||||
|
||||
@method()
|
||||
def Cancel(self) -> None:
|
||||
"""Cancel any ongoing pairing operation."""
|
||||
# Cancel all pending requests
|
||||
for req in self.pending_requests.values():
|
||||
req.response_error = "canceled"
|
||||
req.response_event.set()
|
||||
|
||||
|
||||
# Global agent instance and bus
|
||||
_agent: BlueZAgent | None = None
|
||||
_agent_bus: MessageBus | None = None
|
||||
_agent_registered: bool = False
|
||||
|
||||
|
||||
async def get_agent() -> BlueZAgent:
|
||||
"""Get or create the global agent instance."""
|
||||
global _agent, _agent_bus, _agent_registered
|
||||
|
||||
if _agent is None:
|
||||
_agent = BlueZAgent(PairingMode.INTERACTIVE)
|
||||
|
||||
if _agent_bus is None:
|
||||
_agent_bus = await MessageBus(bus_type=BusType.SYSTEM).connect()
|
||||
# Export the agent interface
|
||||
_agent_bus.export(AGENT_PATH, _agent)
|
||||
|
||||
if not _agent_registered:
|
||||
try:
|
||||
# Get AgentManager1 interface
|
||||
introspection = await _agent_bus.introspect(BLUEZ_SERVICE, "/org/bluez")
|
||||
proxy = _agent_bus.get_proxy_object(
|
||||
BLUEZ_SERVICE, "/org/bluez", introspection
|
||||
)
|
||||
agent_manager = proxy.get_interface(AGENT_MANAGER_IFACE)
|
||||
|
||||
# Register our agent
|
||||
await agent_manager.call_register_agent(AGENT_PATH, AGENT_CAPABILITY)
|
||||
await agent_manager.call_request_default_agent(AGENT_PATH)
|
||||
_agent_registered = True
|
||||
except Exception as e:
|
||||
# Agent might already be registered or other error
|
||||
# Log but don't fail - pairing might still work with default agent
|
||||
print(f"Warning: Could not register agent: {e}")
|
||||
|
||||
return _agent
|
||||
|
||||
|
||||
async def unregister_agent() -> None:
|
||||
"""Unregister the agent from BlueZ."""
|
||||
global _agent, _agent_bus, _agent_registered
|
||||
|
||||
if _agent_bus and _agent_registered:
|
||||
try:
|
||||
introspection = await _agent_bus.introspect(BLUEZ_SERVICE, "/org/bluez")
|
||||
proxy = _agent_bus.get_proxy_object(
|
||||
BLUEZ_SERVICE, "/org/bluez", introspection
|
||||
)
|
||||
agent_manager = proxy.get_interface(AGENT_MANAGER_IFACE)
|
||||
await agent_manager.call_unregister_agent(AGENT_PATH)
|
||||
except Exception:
|
||||
pass # Ignore errors during cleanup
|
||||
_agent_registered = False
|
||||
|
||||
if _agent_bus:
|
||||
_agent_bus.disconnect()
|
||||
_agent_bus = None
|
||||
|
||||
_agent = None
|
||||
|
||||
|
||||
def get_pending_requests() -> list[dict]:
|
||||
"""Get all pending pairing requests as dicts for MCP response."""
|
||||
if _agent is None:
|
||||
return []
|
||||
|
||||
return [
|
||||
{
|
||||
"device_address": req.device_address,
|
||||
"request_type": req.request_type.value,
|
||||
"passkey": req.passkey,
|
||||
"uuid": req.uuid,
|
||||
"timestamp": req.timestamp.isoformat(),
|
||||
}
|
||||
for req in _agent.pending_requests.values()
|
||||
]
|
||||
|
||||
|
||||
async def respond_to_pairing(
|
||||
device_address: str,
|
||||
accept: bool,
|
||||
pin_or_passkey: str | int | None = None,
|
||||
) -> bool:
|
||||
"""Respond to a pending pairing request."""
|
||||
if _agent is None:
|
||||
return False
|
||||
return _agent.respond_to_request(device_address, accept, pin_or_passkey)
|
||||
|
||||
|
||||
async def set_pairing_mode(mode: str) -> None:
|
||||
"""Set the pairing mode for the agent."""
|
||||
agent = await get_agent()
|
||||
agent.set_mode(PairingMode(mode))
|
||||
@ -6,6 +6,8 @@ from typing import Any, Literal
|
||||
|
||||
from fastmcp import Context, FastMCP
|
||||
|
||||
from mcbluetooth import agent
|
||||
from mcbluetooth.agent import PairingMode as AgentPairingMode
|
||||
from mcbluetooth.dbus_client import get_client
|
||||
|
||||
# Type aliases for MCP tool parameters
|
||||
@ -130,6 +132,7 @@ def register_tools(mcp: FastMCP) -> None:
|
||||
adapter: str,
|
||||
address: str,
|
||||
pairing_mode: PairingMode = "interactive",
|
||||
timeout: int = 60,
|
||||
ctx: Context | None = None,
|
||||
) -> dict[str, Any]:
|
||||
"""Initiate pairing with a device.
|
||||
@ -144,6 +147,7 @@ def register_tools(mcp: FastMCP) -> None:
|
||||
- "elicit": Use MCP elicitation to request PIN from user (if supported)
|
||||
- "interactive": Return status, then call bt_pair_confirm with PIN
|
||||
- "auto": Auto-accept pairings (for trusted environments)
|
||||
timeout: Pairing timeout in seconds (default 60)
|
||||
|
||||
Returns:
|
||||
Pairing status including whether confirmation is needed
|
||||
@ -160,56 +164,107 @@ def register_tools(mcp: FastMCP) -> None:
|
||||
await ctx.info(f"Device {address} is already paired")
|
||||
return {"status": "already_paired", "device": asdict(device)}
|
||||
|
||||
# Initialize and configure the pairing agent
|
||||
pairing_agent = await agent.get_agent()
|
||||
pairing_agent.set_mode(AgentPairingMode(pairing_mode))
|
||||
pairing_agent._timeout = float(timeout)
|
||||
|
||||
if ctx:
|
||||
await ctx.debug(f"Agent registered with mode: {pairing_mode}")
|
||||
|
||||
# Start pairing
|
||||
async def do_pair():
|
||||
try:
|
||||
await client.pair_device(adapter, address)
|
||||
return {"success": True}
|
||||
except Exception as e:
|
||||
return {"success": False, "error": str(e)}
|
||||
|
||||
if pairing_mode == "auto":
|
||||
# Direct pairing without agent - may fail if PIN required
|
||||
try:
|
||||
await client.pair_device(adapter, address)
|
||||
# For auto mode, just wait for pairing to complete
|
||||
result = await do_pair()
|
||||
if result["success"]:
|
||||
device = await client.get_device(adapter, address)
|
||||
if ctx:
|
||||
await ctx.info(f"Successfully paired with {address}")
|
||||
return {"status": "paired", "device": asdict(device) if device else None}
|
||||
except Exception as e:
|
||||
if ctx:
|
||||
await ctx.error(f"Pairing failed: {e}")
|
||||
return {"status": "error", "error": str(e)}
|
||||
else:
|
||||
# For interactive/elicit modes, we need an agent
|
||||
if ctx:
|
||||
await ctx.error(f"Pairing failed: {result['error']}")
|
||||
return {"status": "error", "error": result["error"]}
|
||||
else:
|
||||
# For interactive/elicit modes, start pairing in background
|
||||
# and return immediately if a request is pending
|
||||
pair_task = asyncio.create_task(do_pair())
|
||||
|
||||
# Wait briefly to see if pairing completes quickly or needs input
|
||||
try:
|
||||
await client.pair_device(adapter, address)
|
||||
result = await asyncio.wait_for(asyncio.shield(pair_task), timeout=3.0)
|
||||
if result["success"]:
|
||||
device = await client.get_device(adapter, address)
|
||||
if ctx:
|
||||
await ctx.info(f"Successfully paired with {address}")
|
||||
return {"status": "paired", "device": asdict(device) if device else None}
|
||||
except Exception as e:
|
||||
error_msg = str(e)
|
||||
if "AuthenticationFailed" in error_msg or "AuthenticationCanceled" in error_msg:
|
||||
else:
|
||||
# Check if there's a pending request
|
||||
pending = pairing_agent.get_pending_request(address)
|
||||
if pending:
|
||||
if ctx:
|
||||
await ctx.warning("Pairing requires user confirmation or PIN")
|
||||
await ctx.warning(
|
||||
f"Pairing requires {pending.request_type.value}: "
|
||||
f"passkey={pending.passkey}"
|
||||
)
|
||||
return {
|
||||
"status": "awaiting_confirmation",
|
||||
"message": "Pairing requires user confirmation or PIN entry",
|
||||
"pairing_mode": pairing_mode,
|
||||
"request_type": pending.request_type.value,
|
||||
"passkey": pending.passkey,
|
||||
"message": "Use bt_pair_confirm to accept or reject",
|
||||
}
|
||||
if ctx:
|
||||
await ctx.error(f"Pairing failed: {error_msg}")
|
||||
return {"status": "error", "error": error_msg}
|
||||
await ctx.error(f"Pairing failed: {result['error']}")
|
||||
return {"status": "error", "error": result["error"]}
|
||||
except TimeoutError:
|
||||
# Pairing is still in progress - check for pending requests
|
||||
pending = pairing_agent.get_pending_request(address)
|
||||
if pending:
|
||||
if ctx:
|
||||
await ctx.warning(
|
||||
f"Pairing requires {pending.request_type.value}: "
|
||||
f"passkey={pending.passkey}"
|
||||
)
|
||||
return {
|
||||
"status": "awaiting_confirmation",
|
||||
"request_type": pending.request_type.value,
|
||||
"passkey": pending.passkey,
|
||||
"message": "Use bt_pair_confirm to accept or reject",
|
||||
}
|
||||
# No request yet, pairing still in progress
|
||||
if ctx:
|
||||
await ctx.info("Pairing in progress, waiting for device...")
|
||||
return {
|
||||
"status": "pairing_in_progress",
|
||||
"message": "Check bt_pairing_status or wait for device response",
|
||||
}
|
||||
|
||||
@mcp.tool()
|
||||
async def bt_pair_confirm(
|
||||
adapter: str,
|
||||
address: str,
|
||||
pin: str | None = None,
|
||||
passkey: int | None = None,
|
||||
accept: bool = True,
|
||||
ctx: Context | None = None,
|
||||
) -> dict[str, Any]:
|
||||
"""Confirm or reject a pairing request.
|
||||
|
||||
Use this after bt_pair returns "awaiting_confirmation" status.
|
||||
The agent will respond to BlueZ's pairing request.
|
||||
|
||||
Args:
|
||||
adapter: Adapter name
|
||||
address: Device Bluetooth address
|
||||
pin: PIN code if required (usually 4-6 digits)
|
||||
pin: PIN code if required (string, usually 4-6 digits)
|
||||
passkey: Numeric passkey if required (0-999999)
|
||||
accept: True to accept pairing, False to reject
|
||||
|
||||
Returns:
|
||||
@ -217,30 +272,69 @@ def register_tools(mcp: FastMCP) -> None:
|
||||
"""
|
||||
client = await get_client()
|
||||
|
||||
# Respond to the pending agent request
|
||||
pin_or_passkey = passkey if passkey is not None else pin
|
||||
responded = await agent.respond_to_pairing(address, accept, pin_or_passkey)
|
||||
|
||||
if not responded:
|
||||
# No pending request from agent - might be using default agent
|
||||
if ctx:
|
||||
await ctx.warning(f"No pending pairing request for {address}")
|
||||
|
||||
if not accept:
|
||||
if ctx:
|
||||
await ctx.info(f"Rejecting pairing with {address}")
|
||||
try:
|
||||
await client.cancel_pairing(adapter, address)
|
||||
return {"status": "pairing_cancelled"}
|
||||
return {"status": "pairing_rejected"}
|
||||
except Exception as e:
|
||||
# May already be cancelled
|
||||
if ctx:
|
||||
await ctx.error(f"Failed to cancel pairing: {e}")
|
||||
return {"status": "error", "error": str(e)}
|
||||
await ctx.debug(f"Cancel pairing: {e}")
|
||||
return {"status": "pairing_rejected"}
|
||||
|
||||
if ctx:
|
||||
await ctx.info(f"Confirming pairing with {address}")
|
||||
|
||||
try:
|
||||
await client.pair_device(adapter, address)
|
||||
# Wait a moment for the agent to process and BlueZ to complete
|
||||
await asyncio.sleep(1.0)
|
||||
|
||||
# Check if pairing succeeded
|
||||
device = await client.get_device(adapter, address)
|
||||
if device and device.paired:
|
||||
if ctx:
|
||||
await ctx.info(f"Pairing confirmed with {address}")
|
||||
return {"status": "paired", "device": asdict(device) if device else None}
|
||||
except Exception as e:
|
||||
return {"status": "paired", "device": asdict(device)}
|
||||
else:
|
||||
# Pairing might still be in progress
|
||||
if ctx:
|
||||
await ctx.error(f"Pairing confirmation failed: {e}")
|
||||
return {"status": "error", "error": str(e)}
|
||||
await ctx.info("Pairing in progress...")
|
||||
return {
|
||||
"status": "confirmation_sent",
|
||||
"message": "Confirmation sent to agent. Check device status.",
|
||||
}
|
||||
|
||||
@mcp.tool()
|
||||
async def bt_pairing_status(
|
||||
ctx: Context | None = None,
|
||||
) -> dict[str, Any]:
|
||||
"""Get the status of pending pairing requests.
|
||||
|
||||
Shows any pairing requests waiting for confirmation.
|
||||
|
||||
Returns:
|
||||
List of pending pairing requests
|
||||
"""
|
||||
pending = agent.get_pending_requests()
|
||||
if ctx:
|
||||
if pending:
|
||||
await ctx.info(f"Found {len(pending)} pending pairing request(s)")
|
||||
else:
|
||||
await ctx.info("No pending pairing requests")
|
||||
return {
|
||||
"pending_requests": pending,
|
||||
"count": len(pending),
|
||||
}
|
||||
|
||||
@mcp.tool()
|
||||
async def bt_unpair(
|
||||
|
||||
@ -9,8 +9,6 @@ These tools provide MCP integration for:
|
||||
Note: Live capture requires elevated privileges (CAP_NET_RAW or sudo).
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import shutil
|
||||
import struct
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user