Add OBEX profile support (OPP, FTP, PBAP, MAP)

23 new tools for file transfer, phonebook access, and messaging
via obexd D-Bus API. Includes session management, transfer
monitoring, and daemon lifecycle tools.
This commit is contained in:
Ryan Malloy 2026-02-02 14:36:02 -07:00
parent 8cbbcfa286
commit e9f06173c5
5 changed files with 2384 additions and 12 deletions

106
README.md
View File

@ -39,6 +39,7 @@
- **Smart Pairing Agent** — Handles PIN codes, passkeys, and confirmations automatically
- **Protocol Analysis** — Capture and analyze Bluetooth traffic with btmon integration
- **Audio Intelligence** — PipeWire/PulseAudio integration for seamless audio control
- **OBEX Profiles** — File transfer, phonebook access, and message access
## Example Conversation
@ -115,6 +116,24 @@ For HCI packet capture (btmon), additional permissions are needed:
sudo setcap cap_net_raw+ep /usr/bin/btmon
```
### OBEX Setup (for file transfer, phonebook, messages)
OBEX features require the `obexd` daemon:
```bash
# Install obexd (if not included with BlueZ)
# Arch Linux
sudo pacman -S bluez-obex
# Debian/Ubuntu
sudo apt install bluez-obex
# Check status
bt_obex_status # Use this tool to verify setup
```
The `bt_obex_status` tool will check if obexd is installed and running, and provide guidance if setup is needed.
## Features
### Adapter Management
@ -165,6 +184,24 @@ Capture and analyze Bluetooth traffic for debugging and reverse engineering.
"Analyze the capture file for errors"
```
### OBEX File Transfer
Send files, browse phone storage, and download photos using OBEX profiles.
```
"Send this PDF to my phone"
"Browse the files on my phone's storage"
"Download all photos from DCIM folder"
```
### Phonebook & Messages
Access contacts and SMS messages from connected phones via PBAP and MAP profiles.
```
"Download my phone's contacts"
"Search for John in the phonebook"
"List unread text messages"
```
## Tools Reference
### Adapter Tools (6)
@ -225,7 +262,56 @@ Capture and analyze Bluetooth traffic for debugging and reverse engineering.
| `bt_capture_analyze` | Statistical analysis of capture |
| `bt_capture_read_raw` | Human-readable packet decode |
**Total: 38 tools**
### OBEX Tools (23)
#### Setup & Session Management
| Tool | Description |
|------|-------------|
| `bt_obex_status` | Check obexd daemon status and requirements |
| `bt_obex_start_daemon` | Start the obexd daemon |
| `bt_obex_connect` | Create OBEX session (OPP/FTP/PBAP/MAP) |
| `bt_obex_disconnect` | Close OBEX session |
| `bt_obex_sessions` | List active OBEX sessions |
#### Object Push (OPP)
| Tool | Description |
|------|-------------|
| `bt_obex_send_file` | Send file to device |
| `bt_obex_get_vcard` | Pull business card from device |
#### File Transfer (FTP)
| Tool | Description |
|------|-------------|
| `bt_obex_browse` | List files/folders on device |
| `bt_obex_get` | Download file from device |
| `bt_obex_put` | Upload file to device |
| `bt_obex_delete` | Delete file/folder on device |
| `bt_obex_mkdir` | Create folder on device |
#### Phonebook Access (PBAP)
| Tool | Description |
|------|-------------|
| `bt_phonebook_pull` | Download entire phonebook |
| `bt_phonebook_list` | List phonebook entries |
| `bt_phonebook_get` | Download single contact |
| `bt_phonebook_search` | Search phonebook |
| `bt_phonebook_count` | Count phonebook entries |
#### Message Access (MAP)
| Tool | Description |
|------|-------------|
| `bt_messages_folders` | List message folders |
| `bt_messages_list` | List messages in folder |
| `bt_messages_get` | Download message content |
| `bt_messages_send` | Send SMS via MAP |
#### Transfer Management
| Tool | Description |
|------|-------------|
| `bt_obex_transfer_status` | Check transfer progress |
| `bt_obex_transfer_cancel` | Cancel active transfer |
**Total: 61 tools**
## MCP Resources
@ -252,10 +338,10 @@ Live state queries without tool calls:
│ │
│ ┌──────────────────── FastMCP Server ──────────────────────┐ │
│ │ │ │
│ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐
│ │ │ Adapter │ │ Device │ │ Audio │ │ BLE │
│ │ │ Tools │ │ Tools │ │ Tools │ │ Tools │
│ │ └────┬────┘ └────┬────┘ └────┬────┘ └────┬────┘
│ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐
│ │ │ Adapter │ │ Device │ │ Audio │ │ BLE │ OBEX ││
│ │ │ Tools │ │ Tools │ │ Tools │ │ Tools │ │ Tools │
│ │ └────┬────┘ └────┬────┘ └────┬────┘ └────┬────┘ └────┬────┘
│ │ │ │ │ │ │ │
│ │ ┌────┴───────────┴───────────┴───────────┴────┐ │ │
│ │ │ Pairing Agent (Agent1) │ │ │
@ -343,6 +429,7 @@ mcbluetooth/
│ ├── __init__.py
│ ├── server.py # FastMCP server entry point
│ ├── dbus_client.py # BlueZ D-Bus wrapper
│ ├── obex_client.py # obexd D-Bus wrapper
│ ├── audio.py # PipeWire/Pulse integration
│ ├── agent.py # Pairing agent (Agent1)
│ └── tools/
@ -350,7 +437,8 @@ mcbluetooth/
│ ├── device.py # Device management + pairing
│ ├── audio.py # Audio profile tools
│ ├── ble.py # BLE/GATT tools
│ └── monitor.py # btmon integration
│ ├── monitor.py # btmon integration
│ └── obex.py # OBEX profile tools
├── tests/
├── pyproject.toml
└── README.md
@ -364,9 +452,9 @@ mcbluetooth/
- [x] Audio profiles (A2DP/HFP)
- [x] BLE/GATT operations
- [x] btmon packet capture
- [ ] OBEX file transfer (OPP/FTP)
- [ ] Phonebook access (PBAP)
- [ ] Message access (MAP)
- [x] OBEX file transfer (OPP/FTP)
- [x] Phonebook access (PBAP)
- [x] Message access (MAP)
- [ ] Bluetooth Mesh (experimental)
## Related Projects

View File

@ -0,0 +1,887 @@
"""obexd D-Bus client wrapper using dbus-fast.
This module provides an async interface to the BlueZ OBEX daemon via D-Bus.
obexd handles Object Exchange (OBEX) profiles:
- OPP (Object Push Profile): Simple file transfer
- FTP (File Transfer Profile): File browsing and transfer
- PBAP (Phonebook Access Profile): Contact/phonebook access
- MAP (Message Access Profile): SMS/MMS/email access
Object paths follow this pattern:
- /org/bluez/obex - Client
- /org/bluez/obex/client/sessionN - Session
- /org/bluez/obex/client/sessionN/transferM - Transfer
Note: obexd runs as a user service and uses the SESSION bus, not system bus.
"""
from __future__ import annotations
import asyncio
import time
from dataclasses import dataclass, field
from typing import Any, Literal
from dbus_fast import BusType, Variant
from dbus_fast.aio import MessageBus, ProxyInterface
# obexd constants
OBEX_SERVICE = "org.bluez.obex"
OBEX_CLIENT_PATH = "/org/bluez/obex"
OBEX_CLIENT_IFACE = "org.bluez.obex.Client1"
OBEX_SESSION_IFACE = "org.bluez.obex.Session1"
OBEX_TRANSFER_IFACE = "org.bluez.obex.Transfer1"
OBEX_OPP_IFACE = "org.bluez.obex.ObjectPush1"
OBEX_FTP_IFACE = "org.bluez.obex.FileTransfer1"
OBEX_PBAP_IFACE = "org.bluez.obex.PhonebookAccess1"
OBEX_MAP_IFACE = "org.bluez.obex.MessageAccess1"
DBUS_PROPS_IFACE = "org.freedesktop.DBus.Properties"
# OBEX target UUIDs
OBEX_TARGETS = {
"opp": "00001105-0000-1000-8000-00805f9b34fb", # Object Push
"ftp": "00001106-0000-1000-8000-00805f9b34fb", # File Transfer
"pbap": "0000112f-0000-1000-8000-00805f9b34fb", # PBAP PCE
"map": "00001132-0000-1000-8000-00805f9b34fb", # MAP MCE
}
# Target type alias
ObexTarget = Literal["opp", "ftp", "pbap", "map"]
def unwrap_variant(value: Any) -> Any:
"""Recursively unwrap dbus-fast Variant objects to plain Python types."""
if isinstance(value, Variant):
return unwrap_variant(value.value)
elif isinstance(value, dict):
return {k: unwrap_variant(v) for k, v in value.items()}
elif isinstance(value, list):
return [unwrap_variant(v) for v in value]
return value
def generate_session_id(address: str, target: str) -> str:
"""Generate a friendly session ID like 'ftp_C87B235568E8'."""
addr_short = address.replace(":", "").upper()
return f"{target}_{addr_short}"
@dataclass
class ObexSession:
"""Information about an active OBEX session."""
session_id: str
path: str
address: str
target: ObexTarget
source: str = "" # Local adapter address
root: str = "" # Root folder (FTP only)
created: float = field(default_factory=time.time)
@dataclass
class ObexTransfer:
"""Information about an OBEX transfer."""
path: str
status: str # "queued", "active", "suspended", "complete", "error"
name: str = ""
size: int = 0
transferred: int = 0
filename: str = ""
session: str = ""
@dataclass
class FolderEntry:
"""Entry in an FTP folder listing."""
name: str
type: str # "folder" or "file"
size: int = 0
permissions: str = ""
modified: str = ""
@dataclass
class PhonebookEntry:
"""Entry in a phonebook listing."""
handle: str
name: str = ""
phone: str = ""
@dataclass
class MessageEntry:
"""Entry in a message listing."""
handle: str
subject: str = ""
datetime: str = ""
sender_name: str = ""
sender_address: str = ""
type: str = "" # SMS, MMS, EMAIL
read: bool = False
class ObexClient:
"""Async client for obexd D-Bus API."""
def __init__(self):
self._bus: MessageBus | None = None
self._client: ProxyInterface | None = None
self._sessions: dict[str, ObexSession] = {}
async def connect(self) -> None:
"""Connect to the session D-Bus."""
if self._bus is not None:
return
self._bus = await MessageBus(bus_type=BusType.SESSION).connect()
# Get the Client interface
introspection = await self._bus.introspect(OBEX_SERVICE, OBEX_CLIENT_PATH)
proxy = self._bus.get_proxy_object(OBEX_SERVICE, OBEX_CLIENT_PATH, introspection)
self._client = proxy.get_interface(OBEX_CLIENT_IFACE)
async def disconnect(self) -> None:
"""Disconnect from D-Bus and clean up sessions."""
# Remove all active sessions
for session_id in list(self._sessions.keys()):
try:
await self.remove_session(session_id)
except Exception:
pass
if self._bus:
self._bus.disconnect()
self._bus = None
self._client = None
async def _ensure_connected(self) -> None:
"""Ensure we're connected to D-Bus."""
if self._bus is None:
await self.connect()
async def _get_interface(self, path: str, interface: str) -> ProxyInterface:
"""Get a proxy interface for an object."""
await self._ensure_connected()
assert self._bus is not None
introspection = await self._bus.introspect(OBEX_SERVICE, path)
proxy = self._bus.get_proxy_object(OBEX_SERVICE, path, introspection)
return proxy.get_interface(interface)
async def _get_property(self, path: str, interface: str, prop: str) -> Any:
"""Get a single property from an object."""
props_iface = await self._get_interface(path, DBUS_PROPS_IFACE)
result = await props_iface.call_get(interface, prop)
return unwrap_variant(result)
async def _get_all_properties(self, path: str, interface: str) -> dict[str, Any]:
"""Get all properties from an object."""
props_iface = await self._get_interface(path, DBUS_PROPS_IFACE)
props = await props_iface.call_get_all(interface)
return {k: unwrap_variant(v) for k, v in props.items()}
# ==================== Session Management ====================
async def create_session(self, address: str, target: ObexTarget) -> ObexSession:
"""Create an OBEX session to a device.
Args:
address: Bluetooth device address
target: OBEX target profile (opp, ftp, pbap, map)
Returns:
ObexSession with session details
"""
await self._ensure_connected()
assert self._client is not None
# Build session options
options: dict[str, Variant] = {
"Target": Variant("s", target),
}
# Create the session
session_path = await self._client.call_create_session(address, options)
# Get session properties
props = await self._get_all_properties(session_path, OBEX_SESSION_IFACE)
# Generate session ID and store
session_id = generate_session_id(address, target)
# Handle duplicate sessions by appending timestamp
if session_id in self._sessions:
session_id = f"{session_id}_{int(time.time())}"
session = ObexSession(
session_id=session_id,
path=session_path,
address=address,
target=target,
source=props.get("Source", ""),
root=props.get("Root", ""),
)
self._sessions[session_id] = session
return session
async def remove_session(self, session_id: str) -> None:
"""Remove/close an OBEX session.
Args:
session_id: Session ID from create_session
"""
if session_id not in self._sessions:
raise ValueError(f"Unknown session: {session_id}")
session = self._sessions[session_id]
await self._ensure_connected()
assert self._client is not None
await self._client.call_remove_session(session.path)
del self._sessions[session_id]
def get_session(self, session_id: str) -> ObexSession | None:
"""Get a session by ID."""
return self._sessions.get(session_id)
def list_sessions(self) -> list[ObexSession]:
"""List all active sessions."""
return list(self._sessions.values())
async def get_session_info(self, session_id: str) -> dict[str, Any]:
"""Get detailed session info from D-Bus."""
if session_id not in self._sessions:
raise ValueError(f"Unknown session: {session_id}")
session = self._sessions[session_id]
props = await self._get_all_properties(session.path, OBEX_SESSION_IFACE)
return {
"session_id": session_id,
"path": session.path,
"address": session.address,
"target": session.target,
**props,
}
# ==================== Transfer Operations ====================
async def get_transfer_status(self, transfer_path: str) -> ObexTransfer:
"""Get the status of a transfer."""
props = await self._get_all_properties(transfer_path, OBEX_TRANSFER_IFACE)
return ObexTransfer(
path=transfer_path,
status=props.get("Status", "unknown"),
name=props.get("Name", ""),
size=props.get("Size", 0),
transferred=props.get("Transferred", 0),
filename=props.get("Filename", ""),
session=props.get("Session", ""),
)
async def wait_for_transfer(
self,
transfer_path: str,
timeout: int = 300,
poll_interval: float = 0.5,
) -> ObexTransfer:
"""Wait for a transfer to complete.
Args:
transfer_path: D-Bus path of the transfer
timeout: Maximum wait time in seconds
poll_interval: How often to check status
Returns:
Final transfer status
"""
start_time = time.time()
while True:
transfer = await self.get_transfer_status(transfer_path)
if transfer.status in ("complete", "error"):
return transfer
if time.time() - start_time > timeout:
raise TimeoutError(f"Transfer timed out after {timeout}s")
await asyncio.sleep(poll_interval)
async def cancel_transfer(self, transfer_path: str) -> None:
"""Cancel an active transfer."""
iface = await self._get_interface(transfer_path, OBEX_TRANSFER_IFACE)
await iface.call_cancel()
# ==================== OPP (Object Push) Operations ====================
async def opp_send_file(self, session_id: str, local_path: str) -> str:
"""Send a file via Object Push Profile.
Args:
session_id: Session ID (must be OPP session)
local_path: Path to local file to send
Returns:
Transfer path for monitoring
"""
session = self._sessions.get(session_id)
if not session:
raise ValueError(f"Unknown session: {session_id}")
if session.target != "opp":
raise ValueError(f"Session {session_id} is not an OPP session")
iface = await self._get_interface(session.path, OBEX_OPP_IFACE)
transfer_path, _ = await iface.call_send_file(local_path)
return transfer_path
async def opp_pull_business_card(self, session_id: str, target_path: str) -> str:
"""Pull the default business card (vCard) from device.
Args:
session_id: Session ID (must be OPP session)
target_path: Local path to save the vCard
Returns:
Transfer path for monitoring
"""
session = self._sessions.get(session_id)
if not session:
raise ValueError(f"Unknown session: {session_id}")
if session.target != "opp":
raise ValueError(f"Session {session_id} is not an OPP session")
iface = await self._get_interface(session.path, OBEX_OPP_IFACE)
transfer_path, _ = await iface.call_pull_business_card(target_path)
return transfer_path
async def opp_exchange_business_cards(
self, session_id: str, client_vcard: str, target_path: str
) -> str:
"""Exchange business cards with device.
Args:
session_id: Session ID
client_vcard: Path to local vCard to send
target_path: Path to save received vCard
Returns:
Transfer path for monitoring
"""
session = self._sessions.get(session_id)
if not session:
raise ValueError(f"Unknown session: {session_id}")
if session.target != "opp":
raise ValueError(f"Session {session_id} is not an OPP session")
iface = await self._get_interface(session.path, OBEX_OPP_IFACE)
transfer_path, _ = await iface.call_exchange_business_cards(client_vcard, target_path)
return transfer_path
# ==================== FTP (File Transfer) Operations ====================
async def ftp_change_folder(self, session_id: str, folder: str) -> None:
"""Change the current folder.
Args:
session_id: Session ID (must be FTP session)
folder: Folder name, ".." for parent, or "/" for root
"""
session = self._sessions.get(session_id)
if not session:
raise ValueError(f"Unknown session: {session_id}")
if session.target != "ftp":
raise ValueError(f"Session {session_id} is not an FTP session")
iface = await self._get_interface(session.path, OBEX_FTP_IFACE)
# obexd uses different methods for navigation
if folder == "/":
# Go to root
await iface.call_change_folder("/")
elif folder == "..":
# Go to parent
await iface.call_change_folder("..")
else:
await iface.call_change_folder(folder)
async def ftp_list_folder(self, session_id: str) -> list[FolderEntry]:
"""List contents of current folder.
Args:
session_id: Session ID (must be FTP session)
Returns:
List of folder entries
"""
session = self._sessions.get(session_id)
if not session:
raise ValueError(f"Unknown session: {session_id}")
if session.target != "ftp":
raise ValueError(f"Session {session_id} is not an FTP session")
iface = await self._get_interface(session.path, OBEX_FTP_IFACE)
result = await iface.call_list_folder()
entries = []
for item in result:
item = unwrap_variant(item)
entries.append(
FolderEntry(
name=item.get("Name", ""),
type=item.get("Type", "file"),
size=item.get("Size", 0),
permissions=item.get("User-perm", ""),
modified=item.get("Modified", ""),
)
)
return entries
async def ftp_get_file(
self, session_id: str, remote_path: str, local_path: str
) -> str:
"""Download a file from the device.
Args:
session_id: Session ID (must be FTP session)
remote_path: Remote file name in current folder
local_path: Local path to save the file
Returns:
Transfer path for monitoring
"""
session = self._sessions.get(session_id)
if not session:
raise ValueError(f"Unknown session: {session_id}")
if session.target != "ftp":
raise ValueError(f"Session {session_id} is not an FTP session")
iface = await self._get_interface(session.path, OBEX_FTP_IFACE)
transfer_path, _ = await iface.call_get_file(local_path, remote_path)
return transfer_path
async def ftp_put_file(
self, session_id: str, local_path: str, remote_path: str
) -> str:
"""Upload a file to the device.
Args:
session_id: Session ID (must be FTP session)
local_path: Local file path
remote_path: Remote file name in current folder
Returns:
Transfer path for monitoring
"""
session = self._sessions.get(session_id)
if not session:
raise ValueError(f"Unknown session: {session_id}")
if session.target != "ftp":
raise ValueError(f"Session {session_id} is not an FTP session")
iface = await self._get_interface(session.path, OBEX_FTP_IFACE)
transfer_path, _ = await iface.call_put_file(local_path, remote_path)
return transfer_path
async def ftp_copy_file(
self, session_id: str, source: str, destination: str
) -> None:
"""Copy a file on the device.
Args:
session_id: Session ID (must be FTP session)
source: Source file name
destination: Destination file name
"""
session = self._sessions.get(session_id)
if not session:
raise ValueError(f"Unknown session: {session_id}")
if session.target != "ftp":
raise ValueError(f"Session {session_id} is not an FTP session")
iface = await self._get_interface(session.path, OBEX_FTP_IFACE)
await iface.call_copy_file(source, destination)
async def ftp_move_file(
self, session_id: str, source: str, destination: str
) -> None:
"""Move/rename a file on the device.
Args:
session_id: Session ID (must be FTP session)
source: Source file name
destination: Destination file name
"""
session = self._sessions.get(session_id)
if not session:
raise ValueError(f"Unknown session: {session_id}")
if session.target != "ftp":
raise ValueError(f"Session {session_id} is not an FTP session")
iface = await self._get_interface(session.path, OBEX_FTP_IFACE)
await iface.call_move_file(source, destination)
async def ftp_delete(self, session_id: str, name: str) -> None:
"""Delete a file or folder on the device.
Args:
session_id: Session ID (must be FTP session)
name: File or folder name to delete
"""
session = self._sessions.get(session_id)
if not session:
raise ValueError(f"Unknown session: {session_id}")
if session.target != "ftp":
raise ValueError(f"Session {session_id} is not an FTP session")
iface = await self._get_interface(session.path, OBEX_FTP_IFACE)
await iface.call_delete(name)
async def ftp_create_folder(self, session_id: str, name: str) -> None:
"""Create a folder on the device.
Args:
session_id: Session ID (must be FTP session)
name: Folder name to create
"""
session = self._sessions.get(session_id)
if not session:
raise ValueError(f"Unknown session: {session_id}")
if session.target != "ftp":
raise ValueError(f"Session {session_id} is not an FTP session")
iface = await self._get_interface(session.path, OBEX_FTP_IFACE)
await iface.call_create_folder(name)
# ==================== PBAP (Phonebook) Operations ====================
async def pbap_select(self, session_id: str, location: str, phonebook: str) -> None:
"""Select a phonebook for subsequent operations.
Args:
session_id: Session ID (must be PBAP session)
location: "int" (internal) or "sim" (SIM card)
phonebook: Phonebook type - "pb" (contacts), "ich" (incoming calls),
"och" (outgoing calls), "mch" (missed calls), "cch" (combined)
"""
session = self._sessions.get(session_id)
if not session:
raise ValueError(f"Unknown session: {session_id}")
if session.target != "pbap":
raise ValueError(f"Session {session_id} is not a PBAP session")
iface = await self._get_interface(session.path, OBEX_PBAP_IFACE)
await iface.call_select(location, phonebook)
async def pbap_pull_all(self, session_id: str, target_path: str) -> str:
"""Download the entire phonebook as vCard.
Args:
session_id: Session ID (must be PBAP session)
target_path: Local path to save the vCards
Returns:
Transfer path for monitoring
"""
session = self._sessions.get(session_id)
if not session:
raise ValueError(f"Unknown session: {session_id}")
if session.target != "pbap":
raise ValueError(f"Session {session_id} is not a PBAP session")
iface = await self._get_interface(session.path, OBEX_PBAP_IFACE)
transfer_path, _ = await iface.call_pull_all(target_path, {})
return transfer_path
async def pbap_list(self, session_id: str) -> list[PhonebookEntry]:
"""List phonebook entries (handles and names only).
Args:
session_id: Session ID (must be PBAP session)
Returns:
List of phonebook entries with handles
"""
session = self._sessions.get(session_id)
if not session:
raise ValueError(f"Unknown session: {session_id}")
if session.target != "pbap":
raise ValueError(f"Session {session_id} is not a PBAP session")
iface = await self._get_interface(session.path, OBEX_PBAP_IFACE)
result = await iface.call_list({})
entries = []
for item in result:
item = unwrap_variant(item)
entries.append(
PhonebookEntry(
handle=item.get("Handle", ""),
name=item.get("Name", ""),
)
)
return entries
async def pbap_pull(self, session_id: str, handle: str, target_path: str) -> str:
"""Download a specific phonebook entry.
Args:
session_id: Session ID (must be PBAP session)
handle: Entry handle from pbap_list
target_path: Local path to save the vCard
Returns:
Transfer path for monitoring
"""
session = self._sessions.get(session_id)
if not session:
raise ValueError(f"Unknown session: {session_id}")
if session.target != "pbap":
raise ValueError(f"Session {session_id} is not a PBAP session")
iface = await self._get_interface(session.path, OBEX_PBAP_IFACE)
transfer_path, _ = await iface.call_pull(handle, target_path, {})
return transfer_path
async def pbap_search(
self, session_id: str, field: str, value: str
) -> list[PhonebookEntry]:
"""Search the phonebook.
Args:
session_id: Session ID (must be PBAP session)
field: Field to search - "name", "number", or "sound"
value: Value to search for
Returns:
List of matching entries
"""
session = self._sessions.get(session_id)
if not session:
raise ValueError(f"Unknown session: {session_id}")
if session.target != "pbap":
raise ValueError(f"Session {session_id} is not a PBAP session")
iface = await self._get_interface(session.path, OBEX_PBAP_IFACE)
result = await iface.call_search(field, value, {})
entries = []
for item in result:
item = unwrap_variant(item)
entries.append(
PhonebookEntry(
handle=item.get("Handle", ""),
name=item.get("Name", ""),
)
)
return entries
async def pbap_get_size(self, session_id: str) -> int:
"""Get the number of entries in the selected phonebook.
Args:
session_id: Session ID (must be PBAP session)
Returns:
Number of entries
"""
session = self._sessions.get(session_id)
if not session:
raise ValueError(f"Unknown session: {session_id}")
if session.target != "pbap":
raise ValueError(f"Session {session_id} is not a PBAP session")
iface = await self._get_interface(session.path, OBEX_PBAP_IFACE)
size = await iface.call_get_size()
return size
# ==================== MAP (Message Access) Operations ====================
async def map_set_folder(self, session_id: str, folder: str) -> None:
"""Set the current message folder.
Args:
session_id: Session ID (must be MAP session)
folder: Folder path (e.g., "/telecom/msg/inbox")
"""
session = self._sessions.get(session_id)
if not session:
raise ValueError(f"Unknown session: {session_id}")
if session.target != "map":
raise ValueError(f"Session {session_id} is not a MAP session")
iface = await self._get_interface(session.path, OBEX_MAP_IFACE)
await iface.call_set_folder(folder)
async def map_list_folders(self, session_id: str) -> list[dict[str, Any]]:
"""List message folders.
Args:
session_id: Session ID (must be MAP session)
Returns:
List of folder dictionaries
"""
session = self._sessions.get(session_id)
if not session:
raise ValueError(f"Unknown session: {session_id}")
if session.target != "map":
raise ValueError(f"Session {session_id} is not a MAP session")
iface = await self._get_interface(session.path, OBEX_MAP_IFACE)
result = await iface.call_list_folders({})
folders = []
for item in result:
folders.append(unwrap_variant(item))
return folders
async def map_list_messages(
self,
session_id: str,
folder: str,
filters: dict[str, Any] | None = None,
) -> list[MessageEntry]:
"""List messages in a folder.
Args:
session_id: Session ID (must be MAP session)
folder: Folder name
filters: Optional filters (max_count, start_offset, etc.)
Returns:
List of message entries
"""
session = self._sessions.get(session_id)
if not session:
raise ValueError(f"Unknown session: {session_id}")
if session.target != "map":
raise ValueError(f"Session {session_id} is not a MAP session")
iface = await self._get_interface(session.path, OBEX_MAP_IFACE)
# Convert filters to D-Bus format
dbus_filters: dict[str, Variant] = {}
if filters:
for key, value in filters.items():
if isinstance(value, bool):
dbus_filters[key] = Variant("b", value)
elif isinstance(value, int):
dbus_filters[key] = Variant("u", value)
elif isinstance(value, str):
dbus_filters[key] = Variant("s", value)
result = await iface.call_list_messages(folder, dbus_filters)
entries = []
for item in result:
item = unwrap_variant(item)
entries.append(
MessageEntry(
handle=item.get("Handle", ""),
subject=item.get("Subject", ""),
datetime=item.get("DateTime", ""),
sender_name=item.get("SenderName", ""),
sender_address=item.get("SenderAddress", ""),
type=item.get("Type", ""),
read=item.get("Read", False),
)
)
return entries
async def map_get_message(
self, session_id: str, handle: str, target_path: str, attachment: bool = False
) -> str:
"""Download a specific message.
Args:
session_id: Session ID (must be MAP session)
handle: Message handle
target_path: Local path to save the message
attachment: Whether to include attachments
Returns:
Transfer path for monitoring
"""
session = self._sessions.get(session_id)
if not session:
raise ValueError(f"Unknown session: {session_id}")
if session.target != "map":
raise ValueError(f"Session {session_id} is not a MAP session")
iface = await self._get_interface(session.path, OBEX_MAP_IFACE)
transfer_path, _ = await iface.call_get_message(
handle, target_path, {"Attachment": Variant("b", attachment)}
)
return transfer_path
async def map_push_message(
self, session_id: str, folder: str, message_path: str
) -> str:
"""Push/send a message.
Args:
session_id: Session ID (must be MAP session)
folder: Target folder
message_path: Path to message file (bMessage format)
Returns:
Message handle
"""
session = self._sessions.get(session_id)
if not session:
raise ValueError(f"Unknown session: {session_id}")
if session.target != "map":
raise ValueError(f"Session {session_id} is not a MAP session")
iface = await self._get_interface(session.path, OBEX_MAP_IFACE)
handle = await iface.call_push_message(message_path, folder, {})
return handle
async def map_update_inbox(self, session_id: str) -> None:
"""Request the server to refresh the inbox.
Args:
session_id: Session ID (must be MAP session)
"""
session = self._sessions.get(session_id)
if not session:
raise ValueError(f"Unknown session: {session_id}")
if session.target != "map":
raise ValueError(f"Session {session_id} is not a MAP session")
iface = await self._get_interface(session.path, OBEX_MAP_IFACE)
await iface.call_update_inbox()
# Global client instance
_client: ObexClient | None = None
async def get_obex_client() -> ObexClient:
"""Get or create the global OBEX client."""
global _client
if _client is None:
_client = ObexClient()
await _client.connect()
return _client
async def cleanup_stale_sessions() -> None:
"""Clean up sessions that may be stale."""
if _client:
# Check each session is still valid
for session_id in list(_client._sessions.keys()):
try:
await _client.get_session_info(session_id)
except Exception:
# Session no longer exists, remove from tracking
del _client._sessions[session_id]

View File

@ -3,7 +3,7 @@
from fastmcp import FastMCP
from mcbluetooth import resources
from mcbluetooth.tools import adapter, audio, ble, device, monitor
from mcbluetooth.tools import adapter, audio, ble, device, monitor, obex
mcp = FastMCP(
name="mcbluetooth",
@ -14,6 +14,7 @@ This server provides comprehensive control over the Linux Bluetooth stack:
- Device discovery and management (scan, pair, connect)
- Audio profiles (A2DP, HFP) with PipeWire/PulseAudio integration
- BLE/GATT services (read/write characteristics, notifications)
- OBEX profiles (file transfer, phonebook access, messages)
## Resources (live state queries)
- bluetooth://adapters - All Bluetooth adapters
@ -44,6 +45,7 @@ device.register_tools(mcp)
audio.register_tools(mcp)
ble.register_tools(mcp)
monitor.register_tools(mcp)
obex.register_tools(mcp)
def main():

View File

@ -1,5 +1,5 @@
"""MCP tool modules for Bluetooth management."""
from mcbluetooth.tools import adapter, audio, ble, device, monitor
from mcbluetooth.tools import adapter, audio, ble, device, monitor, obex
__all__ = ["adapter", "device", "audio", "ble", "monitor"]
__all__ = ["adapter", "device", "audio", "ble", "monitor", "obex"]

File diff suppressed because it is too large Load Diff