diff --git a/README.md b/README.md index be4acf2..3a6ace7 100644 --- a/README.md +++ b/README.md @@ -39,6 +39,7 @@ - **Smart Pairing Agent** — Handles PIN codes, passkeys, and confirmations automatically - **Protocol Analysis** — Capture and analyze Bluetooth traffic with btmon integration - **Audio Intelligence** — PipeWire/PulseAudio integration for seamless audio control +- **OBEX Profiles** — File transfer, phonebook access, and message access ## Example Conversation @@ -115,6 +116,24 @@ For HCI packet capture (btmon), additional permissions are needed: sudo setcap cap_net_raw+ep /usr/bin/btmon ``` +### OBEX Setup (for file transfer, phonebook, messages) + +OBEX features require the `obexd` daemon: + +```bash +# Install obexd (if not included with BlueZ) +# Arch Linux +sudo pacman -S bluez-obex + +# Debian/Ubuntu +sudo apt install bluez-obex + +# Check status +bt_obex_status # Use this tool to verify setup +``` + +The `bt_obex_status` tool will check if obexd is installed and running, and provide guidance if setup is needed. + ## Features ### Adapter Management @@ -165,6 +184,24 @@ Capture and analyze Bluetooth traffic for debugging and reverse engineering. "Analyze the capture file for errors" ``` +### OBEX File Transfer +Send files, browse phone storage, and download photos using OBEX profiles. + +``` +"Send this PDF to my phone" +"Browse the files on my phone's storage" +"Download all photos from DCIM folder" +``` + +### Phonebook & Messages +Access contacts and SMS messages from connected phones via PBAP and MAP profiles. + +``` +"Download my phone's contacts" +"Search for John in the phonebook" +"List unread text messages" +``` + ## Tools Reference ### Adapter Tools (6) @@ -225,7 +262,56 @@ Capture and analyze Bluetooth traffic for debugging and reverse engineering. | `bt_capture_analyze` | Statistical analysis of capture | | `bt_capture_read_raw` | Human-readable packet decode | -**Total: 38 tools** +### OBEX Tools (23) + +#### Setup & Session Management +| Tool | Description | +|------|-------------| +| `bt_obex_status` | Check obexd daemon status and requirements | +| `bt_obex_start_daemon` | Start the obexd daemon | +| `bt_obex_connect` | Create OBEX session (OPP/FTP/PBAP/MAP) | +| `bt_obex_disconnect` | Close OBEX session | +| `bt_obex_sessions` | List active OBEX sessions | + +#### Object Push (OPP) +| Tool | Description | +|------|-------------| +| `bt_obex_send_file` | Send file to device | +| `bt_obex_get_vcard` | Pull business card from device | + +#### File Transfer (FTP) +| Tool | Description | +|------|-------------| +| `bt_obex_browse` | List files/folders on device | +| `bt_obex_get` | Download file from device | +| `bt_obex_put` | Upload file to device | +| `bt_obex_delete` | Delete file/folder on device | +| `bt_obex_mkdir` | Create folder on device | + +#### Phonebook Access (PBAP) +| Tool | Description | +|------|-------------| +| `bt_phonebook_pull` | Download entire phonebook | +| `bt_phonebook_list` | List phonebook entries | +| `bt_phonebook_get` | Download single contact | +| `bt_phonebook_search` | Search phonebook | +| `bt_phonebook_count` | Count phonebook entries | + +#### Message Access (MAP) +| Tool | Description | +|------|-------------| +| `bt_messages_folders` | List message folders | +| `bt_messages_list` | List messages in folder | +| `bt_messages_get` | Download message content | +| `bt_messages_send` | Send SMS via MAP | + +#### Transfer Management +| Tool | Description | +|------|-------------| +| `bt_obex_transfer_status` | Check transfer progress | +| `bt_obex_transfer_cancel` | Cancel active transfer | + +**Total: 61 tools** ## MCP Resources @@ -252,10 +338,10 @@ Live state queries without tool calls: │ │ │ ┌──────────────────── FastMCP Server ──────────────────────┐ │ │ │ │ │ -│ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ │ -│ │ │ Adapter │ │ Device │ │ Audio │ │ BLE │ │ │ -│ │ │ Tools │ │ Tools │ │ Tools │ │ Tools │ │ │ -│ │ └────┬────┘ └────┬────┘ └────┬────┘ └────┬────┘ │ │ +│ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐│ +│ │ │ Adapter │ │ Device │ │ Audio │ │ BLE │ │ OBEX ││ +│ │ │ Tools │ │ Tools │ │ Tools │ │ Tools │ │ Tools ││ +│ │ └────┬────┘ └────┬────┘ └────┬────┘ └────┬────┘ └────┬────┘│ │ │ │ │ │ │ │ │ │ │ ┌────┴───────────┴───────────┴───────────┴────┐ │ │ │ │ │ Pairing Agent (Agent1) │ │ │ @@ -343,6 +429,7 @@ mcbluetooth/ │ ├── __init__.py │ ├── server.py # FastMCP server entry point │ ├── dbus_client.py # BlueZ D-Bus wrapper +│ ├── obex_client.py # obexd D-Bus wrapper │ ├── audio.py # PipeWire/Pulse integration │ ├── agent.py # Pairing agent (Agent1) │ └── tools/ @@ -350,7 +437,8 @@ mcbluetooth/ │ ├── device.py # Device management + pairing │ ├── audio.py # Audio profile tools │ ├── ble.py # BLE/GATT tools -│ └── monitor.py # btmon integration +│ ├── monitor.py # btmon integration +│ └── obex.py # OBEX profile tools ├── tests/ ├── pyproject.toml └── README.md @@ -364,9 +452,9 @@ mcbluetooth/ - [x] Audio profiles (A2DP/HFP) - [x] BLE/GATT operations - [x] btmon packet capture -- [ ] OBEX file transfer (OPP/FTP) -- [ ] Phonebook access (PBAP) -- [ ] Message access (MAP) +- [x] OBEX file transfer (OPP/FTP) +- [x] Phonebook access (PBAP) +- [x] Message access (MAP) - [ ] Bluetooth Mesh (experimental) ## Related Projects diff --git a/src/mcbluetooth/obex_client.py b/src/mcbluetooth/obex_client.py new file mode 100644 index 0000000..096207b --- /dev/null +++ b/src/mcbluetooth/obex_client.py @@ -0,0 +1,887 @@ +"""obexd D-Bus client wrapper using dbus-fast. + +This module provides an async interface to the BlueZ OBEX daemon via D-Bus. +obexd handles Object Exchange (OBEX) profiles: + +- OPP (Object Push Profile): Simple file transfer +- FTP (File Transfer Profile): File browsing and transfer +- PBAP (Phonebook Access Profile): Contact/phonebook access +- MAP (Message Access Profile): SMS/MMS/email access + +Object paths follow this pattern: +- /org/bluez/obex - Client +- /org/bluez/obex/client/sessionN - Session +- /org/bluez/obex/client/sessionN/transferM - Transfer + +Note: obexd runs as a user service and uses the SESSION bus, not system bus. +""" + +from __future__ import annotations + +import asyncio +import time +from dataclasses import dataclass, field +from typing import Any, Literal + +from dbus_fast import BusType, Variant +from dbus_fast.aio import MessageBus, ProxyInterface + +# obexd constants +OBEX_SERVICE = "org.bluez.obex" +OBEX_CLIENT_PATH = "/org/bluez/obex" +OBEX_CLIENT_IFACE = "org.bluez.obex.Client1" +OBEX_SESSION_IFACE = "org.bluez.obex.Session1" +OBEX_TRANSFER_IFACE = "org.bluez.obex.Transfer1" +OBEX_OPP_IFACE = "org.bluez.obex.ObjectPush1" +OBEX_FTP_IFACE = "org.bluez.obex.FileTransfer1" +OBEX_PBAP_IFACE = "org.bluez.obex.PhonebookAccess1" +OBEX_MAP_IFACE = "org.bluez.obex.MessageAccess1" + +DBUS_PROPS_IFACE = "org.freedesktop.DBus.Properties" + +# OBEX target UUIDs +OBEX_TARGETS = { + "opp": "00001105-0000-1000-8000-00805f9b34fb", # Object Push + "ftp": "00001106-0000-1000-8000-00805f9b34fb", # File Transfer + "pbap": "0000112f-0000-1000-8000-00805f9b34fb", # PBAP PCE + "map": "00001132-0000-1000-8000-00805f9b34fb", # MAP MCE +} + +# Target type alias +ObexTarget = Literal["opp", "ftp", "pbap", "map"] + + +def unwrap_variant(value: Any) -> Any: + """Recursively unwrap dbus-fast Variant objects to plain Python types.""" + if isinstance(value, Variant): + return unwrap_variant(value.value) + elif isinstance(value, dict): + return {k: unwrap_variant(v) for k, v in value.items()} + elif isinstance(value, list): + return [unwrap_variant(v) for v in value] + return value + + +def generate_session_id(address: str, target: str) -> str: + """Generate a friendly session ID like 'ftp_C87B235568E8'.""" + addr_short = address.replace(":", "").upper() + return f"{target}_{addr_short}" + + +@dataclass +class ObexSession: + """Information about an active OBEX session.""" + + session_id: str + path: str + address: str + target: ObexTarget + source: str = "" # Local adapter address + root: str = "" # Root folder (FTP only) + created: float = field(default_factory=time.time) + + +@dataclass +class ObexTransfer: + """Information about an OBEX transfer.""" + + path: str + status: str # "queued", "active", "suspended", "complete", "error" + name: str = "" + size: int = 0 + transferred: int = 0 + filename: str = "" + session: str = "" + + +@dataclass +class FolderEntry: + """Entry in an FTP folder listing.""" + + name: str + type: str # "folder" or "file" + size: int = 0 + permissions: str = "" + modified: str = "" + + +@dataclass +class PhonebookEntry: + """Entry in a phonebook listing.""" + + handle: str + name: str = "" + phone: str = "" + + +@dataclass +class MessageEntry: + """Entry in a message listing.""" + + handle: str + subject: str = "" + datetime: str = "" + sender_name: str = "" + sender_address: str = "" + type: str = "" # SMS, MMS, EMAIL + read: bool = False + + +class ObexClient: + """Async client for obexd D-Bus API.""" + + def __init__(self): + self._bus: MessageBus | None = None + self._client: ProxyInterface | None = None + self._sessions: dict[str, ObexSession] = {} + + async def connect(self) -> None: + """Connect to the session D-Bus.""" + if self._bus is not None: + return + + self._bus = await MessageBus(bus_type=BusType.SESSION).connect() + + # Get the Client interface + introspection = await self._bus.introspect(OBEX_SERVICE, OBEX_CLIENT_PATH) + proxy = self._bus.get_proxy_object(OBEX_SERVICE, OBEX_CLIENT_PATH, introspection) + self._client = proxy.get_interface(OBEX_CLIENT_IFACE) + + async def disconnect(self) -> None: + """Disconnect from D-Bus and clean up sessions.""" + # Remove all active sessions + for session_id in list(self._sessions.keys()): + try: + await self.remove_session(session_id) + except Exception: + pass + + if self._bus: + self._bus.disconnect() + self._bus = None + self._client = None + + async def _ensure_connected(self) -> None: + """Ensure we're connected to D-Bus.""" + if self._bus is None: + await self.connect() + + async def _get_interface(self, path: str, interface: str) -> ProxyInterface: + """Get a proxy interface for an object.""" + await self._ensure_connected() + assert self._bus is not None + + introspection = await self._bus.introspect(OBEX_SERVICE, path) + proxy = self._bus.get_proxy_object(OBEX_SERVICE, path, introspection) + return proxy.get_interface(interface) + + async def _get_property(self, path: str, interface: str, prop: str) -> Any: + """Get a single property from an object.""" + props_iface = await self._get_interface(path, DBUS_PROPS_IFACE) + result = await props_iface.call_get(interface, prop) + return unwrap_variant(result) + + async def _get_all_properties(self, path: str, interface: str) -> dict[str, Any]: + """Get all properties from an object.""" + props_iface = await self._get_interface(path, DBUS_PROPS_IFACE) + props = await props_iface.call_get_all(interface) + return {k: unwrap_variant(v) for k, v in props.items()} + + # ==================== Session Management ==================== + + async def create_session(self, address: str, target: ObexTarget) -> ObexSession: + """Create an OBEX session to a device. + + Args: + address: Bluetooth device address + target: OBEX target profile (opp, ftp, pbap, map) + + Returns: + ObexSession with session details + """ + await self._ensure_connected() + assert self._client is not None + + # Build session options + options: dict[str, Variant] = { + "Target": Variant("s", target), + } + + # Create the session + session_path = await self._client.call_create_session(address, options) + + # Get session properties + props = await self._get_all_properties(session_path, OBEX_SESSION_IFACE) + + # Generate session ID and store + session_id = generate_session_id(address, target) + + # Handle duplicate sessions by appending timestamp + if session_id in self._sessions: + session_id = f"{session_id}_{int(time.time())}" + + session = ObexSession( + session_id=session_id, + path=session_path, + address=address, + target=target, + source=props.get("Source", ""), + root=props.get("Root", ""), + ) + + self._sessions[session_id] = session + return session + + async def remove_session(self, session_id: str) -> None: + """Remove/close an OBEX session. + + Args: + session_id: Session ID from create_session + """ + if session_id not in self._sessions: + raise ValueError(f"Unknown session: {session_id}") + + session = self._sessions[session_id] + await self._ensure_connected() + assert self._client is not None + + await self._client.call_remove_session(session.path) + del self._sessions[session_id] + + def get_session(self, session_id: str) -> ObexSession | None: + """Get a session by ID.""" + return self._sessions.get(session_id) + + def list_sessions(self) -> list[ObexSession]: + """List all active sessions.""" + return list(self._sessions.values()) + + async def get_session_info(self, session_id: str) -> dict[str, Any]: + """Get detailed session info from D-Bus.""" + if session_id not in self._sessions: + raise ValueError(f"Unknown session: {session_id}") + + session = self._sessions[session_id] + props = await self._get_all_properties(session.path, OBEX_SESSION_IFACE) + return { + "session_id": session_id, + "path": session.path, + "address": session.address, + "target": session.target, + **props, + } + + # ==================== Transfer Operations ==================== + + async def get_transfer_status(self, transfer_path: str) -> ObexTransfer: + """Get the status of a transfer.""" + props = await self._get_all_properties(transfer_path, OBEX_TRANSFER_IFACE) + + return ObexTransfer( + path=transfer_path, + status=props.get("Status", "unknown"), + name=props.get("Name", ""), + size=props.get("Size", 0), + transferred=props.get("Transferred", 0), + filename=props.get("Filename", ""), + session=props.get("Session", ""), + ) + + async def wait_for_transfer( + self, + transfer_path: str, + timeout: int = 300, + poll_interval: float = 0.5, + ) -> ObexTransfer: + """Wait for a transfer to complete. + + Args: + transfer_path: D-Bus path of the transfer + timeout: Maximum wait time in seconds + poll_interval: How often to check status + + Returns: + Final transfer status + """ + start_time = time.time() + + while True: + transfer = await self.get_transfer_status(transfer_path) + + if transfer.status in ("complete", "error"): + return transfer + + if time.time() - start_time > timeout: + raise TimeoutError(f"Transfer timed out after {timeout}s") + + await asyncio.sleep(poll_interval) + + async def cancel_transfer(self, transfer_path: str) -> None: + """Cancel an active transfer.""" + iface = await self._get_interface(transfer_path, OBEX_TRANSFER_IFACE) + await iface.call_cancel() + + # ==================== OPP (Object Push) Operations ==================== + + async def opp_send_file(self, session_id: str, local_path: str) -> str: + """Send a file via Object Push Profile. + + Args: + session_id: Session ID (must be OPP session) + local_path: Path to local file to send + + Returns: + Transfer path for monitoring + """ + session = self._sessions.get(session_id) + if not session: + raise ValueError(f"Unknown session: {session_id}") + if session.target != "opp": + raise ValueError(f"Session {session_id} is not an OPP session") + + iface = await self._get_interface(session.path, OBEX_OPP_IFACE) + transfer_path, _ = await iface.call_send_file(local_path) + return transfer_path + + async def opp_pull_business_card(self, session_id: str, target_path: str) -> str: + """Pull the default business card (vCard) from device. + + Args: + session_id: Session ID (must be OPP session) + target_path: Local path to save the vCard + + Returns: + Transfer path for monitoring + """ + session = self._sessions.get(session_id) + if not session: + raise ValueError(f"Unknown session: {session_id}") + if session.target != "opp": + raise ValueError(f"Session {session_id} is not an OPP session") + + iface = await self._get_interface(session.path, OBEX_OPP_IFACE) + transfer_path, _ = await iface.call_pull_business_card(target_path) + return transfer_path + + async def opp_exchange_business_cards( + self, session_id: str, client_vcard: str, target_path: str + ) -> str: + """Exchange business cards with device. + + Args: + session_id: Session ID + client_vcard: Path to local vCard to send + target_path: Path to save received vCard + + Returns: + Transfer path for monitoring + """ + session = self._sessions.get(session_id) + if not session: + raise ValueError(f"Unknown session: {session_id}") + if session.target != "opp": + raise ValueError(f"Session {session_id} is not an OPP session") + + iface = await self._get_interface(session.path, OBEX_OPP_IFACE) + transfer_path, _ = await iface.call_exchange_business_cards(client_vcard, target_path) + return transfer_path + + # ==================== FTP (File Transfer) Operations ==================== + + async def ftp_change_folder(self, session_id: str, folder: str) -> None: + """Change the current folder. + + Args: + session_id: Session ID (must be FTP session) + folder: Folder name, ".." for parent, or "/" for root + """ + session = self._sessions.get(session_id) + if not session: + raise ValueError(f"Unknown session: {session_id}") + if session.target != "ftp": + raise ValueError(f"Session {session_id} is not an FTP session") + + iface = await self._get_interface(session.path, OBEX_FTP_IFACE) + + # obexd uses different methods for navigation + if folder == "/": + # Go to root + await iface.call_change_folder("/") + elif folder == "..": + # Go to parent + await iface.call_change_folder("..") + else: + await iface.call_change_folder(folder) + + async def ftp_list_folder(self, session_id: str) -> list[FolderEntry]: + """List contents of current folder. + + Args: + session_id: Session ID (must be FTP session) + + Returns: + List of folder entries + """ + session = self._sessions.get(session_id) + if not session: + raise ValueError(f"Unknown session: {session_id}") + if session.target != "ftp": + raise ValueError(f"Session {session_id} is not an FTP session") + + iface = await self._get_interface(session.path, OBEX_FTP_IFACE) + result = await iface.call_list_folder() + + entries = [] + for item in result: + item = unwrap_variant(item) + entries.append( + FolderEntry( + name=item.get("Name", ""), + type=item.get("Type", "file"), + size=item.get("Size", 0), + permissions=item.get("User-perm", ""), + modified=item.get("Modified", ""), + ) + ) + return entries + + async def ftp_get_file( + self, session_id: str, remote_path: str, local_path: str + ) -> str: + """Download a file from the device. + + Args: + session_id: Session ID (must be FTP session) + remote_path: Remote file name in current folder + local_path: Local path to save the file + + Returns: + Transfer path for monitoring + """ + session = self._sessions.get(session_id) + if not session: + raise ValueError(f"Unknown session: {session_id}") + if session.target != "ftp": + raise ValueError(f"Session {session_id} is not an FTP session") + + iface = await self._get_interface(session.path, OBEX_FTP_IFACE) + transfer_path, _ = await iface.call_get_file(local_path, remote_path) + return transfer_path + + async def ftp_put_file( + self, session_id: str, local_path: str, remote_path: str + ) -> str: + """Upload a file to the device. + + Args: + session_id: Session ID (must be FTP session) + local_path: Local file path + remote_path: Remote file name in current folder + + Returns: + Transfer path for monitoring + """ + session = self._sessions.get(session_id) + if not session: + raise ValueError(f"Unknown session: {session_id}") + if session.target != "ftp": + raise ValueError(f"Session {session_id} is not an FTP session") + + iface = await self._get_interface(session.path, OBEX_FTP_IFACE) + transfer_path, _ = await iface.call_put_file(local_path, remote_path) + return transfer_path + + async def ftp_copy_file( + self, session_id: str, source: str, destination: str + ) -> None: + """Copy a file on the device. + + Args: + session_id: Session ID (must be FTP session) + source: Source file name + destination: Destination file name + """ + session = self._sessions.get(session_id) + if not session: + raise ValueError(f"Unknown session: {session_id}") + if session.target != "ftp": + raise ValueError(f"Session {session_id} is not an FTP session") + + iface = await self._get_interface(session.path, OBEX_FTP_IFACE) + await iface.call_copy_file(source, destination) + + async def ftp_move_file( + self, session_id: str, source: str, destination: str + ) -> None: + """Move/rename a file on the device. + + Args: + session_id: Session ID (must be FTP session) + source: Source file name + destination: Destination file name + """ + session = self._sessions.get(session_id) + if not session: + raise ValueError(f"Unknown session: {session_id}") + if session.target != "ftp": + raise ValueError(f"Session {session_id} is not an FTP session") + + iface = await self._get_interface(session.path, OBEX_FTP_IFACE) + await iface.call_move_file(source, destination) + + async def ftp_delete(self, session_id: str, name: str) -> None: + """Delete a file or folder on the device. + + Args: + session_id: Session ID (must be FTP session) + name: File or folder name to delete + """ + session = self._sessions.get(session_id) + if not session: + raise ValueError(f"Unknown session: {session_id}") + if session.target != "ftp": + raise ValueError(f"Session {session_id} is not an FTP session") + + iface = await self._get_interface(session.path, OBEX_FTP_IFACE) + await iface.call_delete(name) + + async def ftp_create_folder(self, session_id: str, name: str) -> None: + """Create a folder on the device. + + Args: + session_id: Session ID (must be FTP session) + name: Folder name to create + """ + session = self._sessions.get(session_id) + if not session: + raise ValueError(f"Unknown session: {session_id}") + if session.target != "ftp": + raise ValueError(f"Session {session_id} is not an FTP session") + + iface = await self._get_interface(session.path, OBEX_FTP_IFACE) + await iface.call_create_folder(name) + + # ==================== PBAP (Phonebook) Operations ==================== + + async def pbap_select(self, session_id: str, location: str, phonebook: str) -> None: + """Select a phonebook for subsequent operations. + + Args: + session_id: Session ID (must be PBAP session) + location: "int" (internal) or "sim" (SIM card) + phonebook: Phonebook type - "pb" (contacts), "ich" (incoming calls), + "och" (outgoing calls), "mch" (missed calls), "cch" (combined) + """ + session = self._sessions.get(session_id) + if not session: + raise ValueError(f"Unknown session: {session_id}") + if session.target != "pbap": + raise ValueError(f"Session {session_id} is not a PBAP session") + + iface = await self._get_interface(session.path, OBEX_PBAP_IFACE) + await iface.call_select(location, phonebook) + + async def pbap_pull_all(self, session_id: str, target_path: str) -> str: + """Download the entire phonebook as vCard. + + Args: + session_id: Session ID (must be PBAP session) + target_path: Local path to save the vCards + + Returns: + Transfer path for monitoring + """ + session = self._sessions.get(session_id) + if not session: + raise ValueError(f"Unknown session: {session_id}") + if session.target != "pbap": + raise ValueError(f"Session {session_id} is not a PBAP session") + + iface = await self._get_interface(session.path, OBEX_PBAP_IFACE) + transfer_path, _ = await iface.call_pull_all(target_path, {}) + return transfer_path + + async def pbap_list(self, session_id: str) -> list[PhonebookEntry]: + """List phonebook entries (handles and names only). + + Args: + session_id: Session ID (must be PBAP session) + + Returns: + List of phonebook entries with handles + """ + session = self._sessions.get(session_id) + if not session: + raise ValueError(f"Unknown session: {session_id}") + if session.target != "pbap": + raise ValueError(f"Session {session_id} is not a PBAP session") + + iface = await self._get_interface(session.path, OBEX_PBAP_IFACE) + result = await iface.call_list({}) + + entries = [] + for item in result: + item = unwrap_variant(item) + entries.append( + PhonebookEntry( + handle=item.get("Handle", ""), + name=item.get("Name", ""), + ) + ) + return entries + + async def pbap_pull(self, session_id: str, handle: str, target_path: str) -> str: + """Download a specific phonebook entry. + + Args: + session_id: Session ID (must be PBAP session) + handle: Entry handle from pbap_list + target_path: Local path to save the vCard + + Returns: + Transfer path for monitoring + """ + session = self._sessions.get(session_id) + if not session: + raise ValueError(f"Unknown session: {session_id}") + if session.target != "pbap": + raise ValueError(f"Session {session_id} is not a PBAP session") + + iface = await self._get_interface(session.path, OBEX_PBAP_IFACE) + transfer_path, _ = await iface.call_pull(handle, target_path, {}) + return transfer_path + + async def pbap_search( + self, session_id: str, field: str, value: str + ) -> list[PhonebookEntry]: + """Search the phonebook. + + Args: + session_id: Session ID (must be PBAP session) + field: Field to search - "name", "number", or "sound" + value: Value to search for + + Returns: + List of matching entries + """ + session = self._sessions.get(session_id) + if not session: + raise ValueError(f"Unknown session: {session_id}") + if session.target != "pbap": + raise ValueError(f"Session {session_id} is not a PBAP session") + + iface = await self._get_interface(session.path, OBEX_PBAP_IFACE) + result = await iface.call_search(field, value, {}) + + entries = [] + for item in result: + item = unwrap_variant(item) + entries.append( + PhonebookEntry( + handle=item.get("Handle", ""), + name=item.get("Name", ""), + ) + ) + return entries + + async def pbap_get_size(self, session_id: str) -> int: + """Get the number of entries in the selected phonebook. + + Args: + session_id: Session ID (must be PBAP session) + + Returns: + Number of entries + """ + session = self._sessions.get(session_id) + if not session: + raise ValueError(f"Unknown session: {session_id}") + if session.target != "pbap": + raise ValueError(f"Session {session_id} is not a PBAP session") + + iface = await self._get_interface(session.path, OBEX_PBAP_IFACE) + size = await iface.call_get_size() + return size + + # ==================== MAP (Message Access) Operations ==================== + + async def map_set_folder(self, session_id: str, folder: str) -> None: + """Set the current message folder. + + Args: + session_id: Session ID (must be MAP session) + folder: Folder path (e.g., "/telecom/msg/inbox") + """ + session = self._sessions.get(session_id) + if not session: + raise ValueError(f"Unknown session: {session_id}") + if session.target != "map": + raise ValueError(f"Session {session_id} is not a MAP session") + + iface = await self._get_interface(session.path, OBEX_MAP_IFACE) + await iface.call_set_folder(folder) + + async def map_list_folders(self, session_id: str) -> list[dict[str, Any]]: + """List message folders. + + Args: + session_id: Session ID (must be MAP session) + + Returns: + List of folder dictionaries + """ + session = self._sessions.get(session_id) + if not session: + raise ValueError(f"Unknown session: {session_id}") + if session.target != "map": + raise ValueError(f"Session {session_id} is not a MAP session") + + iface = await self._get_interface(session.path, OBEX_MAP_IFACE) + result = await iface.call_list_folders({}) + + folders = [] + for item in result: + folders.append(unwrap_variant(item)) + return folders + + async def map_list_messages( + self, + session_id: str, + folder: str, + filters: dict[str, Any] | None = None, + ) -> list[MessageEntry]: + """List messages in a folder. + + Args: + session_id: Session ID (must be MAP session) + folder: Folder name + filters: Optional filters (max_count, start_offset, etc.) + + Returns: + List of message entries + """ + session = self._sessions.get(session_id) + if not session: + raise ValueError(f"Unknown session: {session_id}") + if session.target != "map": + raise ValueError(f"Session {session_id} is not a MAP session") + + iface = await self._get_interface(session.path, OBEX_MAP_IFACE) + + # Convert filters to D-Bus format + dbus_filters: dict[str, Variant] = {} + if filters: + for key, value in filters.items(): + if isinstance(value, bool): + dbus_filters[key] = Variant("b", value) + elif isinstance(value, int): + dbus_filters[key] = Variant("u", value) + elif isinstance(value, str): + dbus_filters[key] = Variant("s", value) + + result = await iface.call_list_messages(folder, dbus_filters) + + entries = [] + for item in result: + item = unwrap_variant(item) + entries.append( + MessageEntry( + handle=item.get("Handle", ""), + subject=item.get("Subject", ""), + datetime=item.get("DateTime", ""), + sender_name=item.get("SenderName", ""), + sender_address=item.get("SenderAddress", ""), + type=item.get("Type", ""), + read=item.get("Read", False), + ) + ) + return entries + + async def map_get_message( + self, session_id: str, handle: str, target_path: str, attachment: bool = False + ) -> str: + """Download a specific message. + + Args: + session_id: Session ID (must be MAP session) + handle: Message handle + target_path: Local path to save the message + attachment: Whether to include attachments + + Returns: + Transfer path for monitoring + """ + session = self._sessions.get(session_id) + if not session: + raise ValueError(f"Unknown session: {session_id}") + if session.target != "map": + raise ValueError(f"Session {session_id} is not a MAP session") + + iface = await self._get_interface(session.path, OBEX_MAP_IFACE) + transfer_path, _ = await iface.call_get_message( + handle, target_path, {"Attachment": Variant("b", attachment)} + ) + return transfer_path + + async def map_push_message( + self, session_id: str, folder: str, message_path: str + ) -> str: + """Push/send a message. + + Args: + session_id: Session ID (must be MAP session) + folder: Target folder + message_path: Path to message file (bMessage format) + + Returns: + Message handle + """ + session = self._sessions.get(session_id) + if not session: + raise ValueError(f"Unknown session: {session_id}") + if session.target != "map": + raise ValueError(f"Session {session_id} is not a MAP session") + + iface = await self._get_interface(session.path, OBEX_MAP_IFACE) + handle = await iface.call_push_message(message_path, folder, {}) + return handle + + async def map_update_inbox(self, session_id: str) -> None: + """Request the server to refresh the inbox. + + Args: + session_id: Session ID (must be MAP session) + """ + session = self._sessions.get(session_id) + if not session: + raise ValueError(f"Unknown session: {session_id}") + if session.target != "map": + raise ValueError(f"Session {session_id} is not a MAP session") + + iface = await self._get_interface(session.path, OBEX_MAP_IFACE) + await iface.call_update_inbox() + + +# Global client instance +_client: ObexClient | None = None + + +async def get_obex_client() -> ObexClient: + """Get or create the global OBEX client.""" + global _client + if _client is None: + _client = ObexClient() + await _client.connect() + return _client + + +async def cleanup_stale_sessions() -> None: + """Clean up sessions that may be stale.""" + if _client: + # Check each session is still valid + for session_id in list(_client._sessions.keys()): + try: + await _client.get_session_info(session_id) + except Exception: + # Session no longer exists, remove from tracking + del _client._sessions[session_id] diff --git a/src/mcbluetooth/server.py b/src/mcbluetooth/server.py index cccceaa..bdd44dc 100644 --- a/src/mcbluetooth/server.py +++ b/src/mcbluetooth/server.py @@ -3,7 +3,7 @@ from fastmcp import FastMCP from mcbluetooth import resources -from mcbluetooth.tools import adapter, audio, ble, device, monitor +from mcbluetooth.tools import adapter, audio, ble, device, monitor, obex mcp = FastMCP( name="mcbluetooth", @@ -14,6 +14,7 @@ This server provides comprehensive control over the Linux Bluetooth stack: - Device discovery and management (scan, pair, connect) - Audio profiles (A2DP, HFP) with PipeWire/PulseAudio integration - BLE/GATT services (read/write characteristics, notifications) +- OBEX profiles (file transfer, phonebook access, messages) ## Resources (live state queries) - bluetooth://adapters - All Bluetooth adapters @@ -44,6 +45,7 @@ device.register_tools(mcp) audio.register_tools(mcp) ble.register_tools(mcp) monitor.register_tools(mcp) +obex.register_tools(mcp) def main(): diff --git a/src/mcbluetooth/tools/__init__.py b/src/mcbluetooth/tools/__init__.py index 9776b08..d1d006c 100644 --- a/src/mcbluetooth/tools/__init__.py +++ b/src/mcbluetooth/tools/__init__.py @@ -1,5 +1,5 @@ """MCP tool modules for Bluetooth management.""" -from mcbluetooth.tools import adapter, audio, ble, device, monitor +from mcbluetooth.tools import adapter, audio, ble, device, monitor, obex -__all__ = ["adapter", "device", "audio", "ble", "monitor"] +__all__ = ["adapter", "device", "audio", "ble", "monitor", "obex"] diff --git a/src/mcbluetooth/tools/obex.py b/src/mcbluetooth/tools/obex.py new file mode 100644 index 0000000..ef6f761 --- /dev/null +++ b/src/mcbluetooth/tools/obex.py @@ -0,0 +1,1395 @@ +"""OBEX profile tools for BlueZ. + +OBEX (Object Exchange) provides file transfer capabilities over Bluetooth: +- OPP (Object Push): Simple file sending +- FTP (File Transfer): Full file system browsing +- PBAP (Phonebook Access): Contact/call history access +- MAP (Message Access): SMS/MMS/email access + +Requires the obexd daemon to be installed and running. +""" + +import asyncio +import shutil +from dataclasses import asdict +from pathlib import Path +from typing import Any, Literal + +from fastmcp import Context, FastMCP + +from mcbluetooth.obex_client import ( + ObexTarget, + get_obex_client, +) + +# Type aliases +PbapLocation = Literal["int", "sim"] +PbapPhonebook = Literal["pb", "ich", "och", "mch", "cch"] +PbapSearchField = Literal["name", "number", "sound"] + + +def register_tools(mcp: FastMCP) -> None: + """Register OBEX tools with the MCP server.""" + + # ─────────────── Setup & Status ─────────────── + + @mcp.tool() + async def bt_obex_status( + ctx: Context | None = None, + ) -> dict[str, Any]: + """Check OBEX subsystem status and requirements. + + Verifies that obexd is installed and accessible. Returns status + of the OBEX daemon and any active sessions. + + Returns: + Status dict with obexd availability and active sessions + """ + result: dict[str, Any] = { + "obexd_installed": False, + "obexd_path": None, + "obexd_running": False, + "dbus_accessible": False, + "active_sessions": [], + "issues": [], + "recommendations": [], + } + + # Check if obexd binary exists + obexd_paths = [ + "/usr/lib/bluetooth/obexd", + "/usr/libexec/bluetooth/obexd", + "/usr/lib/bluez/obexd", + ] + + obexd_path = None + for path in obexd_paths: + if Path(path).exists(): + obexd_path = path + break + + # Also check if it's in PATH (for custom installs) + if not obexd_path: + obexd_path = shutil.which("obexd") + + if obexd_path: + result["obexd_installed"] = True + result["obexd_path"] = obexd_path + if ctx: + await ctx.debug(f"Found obexd at {obexd_path}") + else: + result["issues"].append("obexd binary not found") + result["recommendations"].append( + "Install bluez-obex package (Arch: pacman -S bluez-obex, " + "Debian/Ubuntu: apt install bluez-obex)" + ) + + # Check if obexd is running (via pgrep) + try: + proc = await asyncio.create_subprocess_exec( + "pgrep", "-x", "obexd", + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.DEVNULL, + ) + stdout, _ = await proc.communicate() + if proc.returncode == 0 and stdout.strip(): + result["obexd_running"] = True + if ctx: + await ctx.debug("obexd is running") + else: + result["issues"].append("obexd daemon is not running") + result["recommendations"].append( + "Start obexd with: /usr/lib/bluetooth/obexd (or add to autostart)" + ) + except Exception: + result["issues"].append("Could not check if obexd is running") + + # Try to connect to D-Bus + try: + client = await get_obex_client() + result["dbus_accessible"] = True + + # Get active sessions + sessions = client.list_sessions() + result["active_sessions"] = [ + { + "session_id": s.session_id, + "address": s.address, + "target": s.target, + } + for s in sessions + ] + + if ctx: + await ctx.info( + f"OBEX ready: {len(sessions)} active session(s)" + ) + except Exception as e: + error_msg = str(e) + result["dbus_accessible"] = False + result["issues"].append(f"Cannot connect to obexd D-Bus: {error_msg}") + + if "org.bluez.obex" in error_msg or "ServiceUnknown" in error_msg: + result["recommendations"].append( + "obexd is not registered on D-Bus. Start it manually: " + "/usr/lib/bluetooth/obexd" + ) + elif "Connection refused" in error_msg: + result["recommendations"].append( + "Session D-Bus not available. This usually requires a desktop session." + ) + + # Determine overall status + if result["obexd_installed"] and result["dbus_accessible"]: + result["status"] = "ready" + if ctx: + await ctx.info("OBEX subsystem is ready") + elif result["obexd_installed"]: + result["status"] = "obexd_not_running" + else: + result["status"] = "not_installed" + + return result + + @mcp.tool() + async def bt_obex_start_daemon( + ctx: Context | None = None, + ) -> dict[str, Any]: + """Attempt to start the obexd daemon. + + Tries to launch obexd as a background process. This may fail if + obexd is already running or if permissions are insufficient. + + Returns: + Status of the daemon start attempt + """ + # Find obexd + obexd_paths = [ + "/usr/lib/bluetooth/obexd", + "/usr/libexec/bluetooth/obexd", + "/usr/lib/bluez/obexd", + ] + + obexd_path = None + for path in obexd_paths: + if Path(path).exists(): + obexd_path = path + break + + if not obexd_path: + return { + "success": False, + "error": "obexd not found. Install bluez-obex package.", + } + + # Check if already running + try: + proc = await asyncio.create_subprocess_exec( + "pgrep", "-x", "obexd", + stdout=asyncio.subprocess.PIPE, + ) + stdout, _ = await proc.communicate() + if proc.returncode == 0 and stdout.strip(): + if ctx: + await ctx.info("obexd is already running") + return { + "success": True, + "status": "already_running", + "pid": stdout.decode().strip(), + } + except Exception: + pass + + # Try to start obexd + if ctx: + await ctx.info(f"Starting obexd from {obexd_path}") + + try: + # Start obexd in background + proc = await asyncio.create_subprocess_exec( + obexd_path, + stdout=asyncio.subprocess.DEVNULL, + stderr=asyncio.subprocess.PIPE, + start_new_session=True, # Detach from this process group + ) + + # Wait briefly to see if it starts + await asyncio.sleep(1.0) + + # Check if it's running + check = await asyncio.create_subprocess_exec( + "pgrep", "-x", "obexd", + stdout=asyncio.subprocess.PIPE, + ) + stdout, _ = await check.communicate() + + if check.returncode == 0 and stdout.strip(): + if ctx: + await ctx.info("obexd started successfully") + return { + "success": True, + "status": "started", + "pid": stdout.decode().strip(), + } + else: + # Try to get error message + if proc.stderr: + stderr = await proc.stderr.read() + error_msg = stderr.decode().strip() + else: + error_msg = "Unknown error" + + return { + "success": False, + "error": f"obexd failed to start: {error_msg}", + "suggestions": [ + "Check D-Bus session is available", + "Run manually: " + obexd_path, + "Check logs: journalctl --user -u obex", + ], + } + + except Exception as e: + return {"success": False, "error": str(e)} + + # ─────────────── Session Management ─────────────── + + @mcp.tool() + async def bt_obex_connect( + address: str, + target: ObexTarget, + ctx: Context | None = None, + ) -> dict[str, Any]: + """Create an OBEX session to a device. + + Establishes a connection for file transfer or data access. + The session_id is used for subsequent operations. + + Args: + address: Device Bluetooth address (e.g., "AA:BB:CC:DD:EE:FF") + target: OBEX profile to use: + - "opp": Object Push (send files) + - "ftp": File Transfer (browse & transfer files) + - "pbap": Phonebook Access (read contacts) + - "map": Message Access (read SMS/MMS) + + Returns: + Session info with session_id for subsequent operations + """ + if ctx: + await ctx.info(f"Connecting OBEX {target.upper()} to {address}") + + try: + client = await get_obex_client() + session = await client.create_session(address, target) + + if ctx: + await ctx.info(f"Session created: {session.session_id}") + + return { + "success": True, + "session_id": session.session_id, + "address": session.address, + "target": session.target, + "source": session.source, + "root": session.root, + } + except Exception as e: + error_msg = str(e) + if ctx: + await ctx.error(f"Failed to create session: {error_msg}") + + hints = [] + if "NotFound" in error_msg or "DoesNotExist" in error_msg: + hints.append("Device may not be in range or discoverable") + hints.append("Ensure device is paired first with bt_pair") + if "NotAuthorized" in error_msg or "Rejected" in error_msg: + hints.append("Device rejected the connection") + hints.append(f"Enable {target.upper()} profile on the device") + if "NotSupported" in error_msg: + hints.append(f"Device does not support {target.upper()} profile") + + return {"success": False, "error": error_msg, "hints": hints} + + @mcp.tool() + async def bt_obex_disconnect( + session_id: str, + ctx: Context | None = None, + ) -> dict[str, Any]: + """Close an OBEX session. + + Args: + session_id: Session ID from bt_obex_connect + + Returns: + Status of the disconnect operation + """ + if ctx: + await ctx.info(f"Closing OBEX session: {session_id}") + + try: + client = await get_obex_client() + await client.remove_session(session_id) + + if ctx: + await ctx.info("Session closed") + return {"success": True, "session_id": session_id} + except Exception as e: + if ctx: + await ctx.error(f"Failed to close session: {e}") + return {"success": False, "error": str(e)} + + @mcp.tool() + async def bt_obex_sessions( + ctx: Context | None = None, + ) -> dict[str, Any]: + """List active OBEX sessions. + + Returns: + List of active sessions with their details + """ + try: + client = await get_obex_client() + sessions = client.list_sessions() + + if ctx: + await ctx.info(f"Found {len(sessions)} active session(s)") + + return { + "success": True, + "sessions": [asdict(s) for s in sessions], + "count": len(sessions), + } + except Exception as e: + return {"success": False, "error": str(e)} + + # ─────────────── OPP (Object Push) ─────────────── + + @mcp.tool() + async def bt_obex_send_file( + address: str, + file_path: str, + wait: bool = True, + ctx: Context | None = None, + ) -> dict[str, Any]: + """Send a file to a device via Object Push Profile. + + Creates a temporary OPP session, sends the file, and closes the session. + The receiving device will show a prompt to accept/reject. + + Args: + address: Device Bluetooth address + file_path: Path to the local file to send + wait: Wait for transfer to complete (default True) + + Returns: + Transfer status and file info + """ + # Validate file exists + path = Path(file_path).expanduser().resolve() + if not path.exists(): + return {"success": False, "error": f"File not found: {file_path}"} + + if ctx: + await ctx.info(f"Sending {path.name} to {address}") + + client = await get_obex_client() + session = None + + try: + # Create OPP session + session = await client.create_session(address, "opp") + if ctx: + await ctx.debug(f"OPP session created: {session.session_id}") + + # Start transfer + transfer_path = await client.opp_send_file(session.session_id, str(path)) + if ctx: + await ctx.debug(f"Transfer started: {transfer_path}") + + if wait: + # Wait for completion with progress + transfer = await client.wait_for_transfer(transfer_path, timeout=300) + + if transfer.status == "complete": + if ctx: + await ctx.info(f"File sent successfully: {path.name}") + return { + "success": True, + "status": "complete", + "filename": path.name, + "size": transfer.size, + "transferred": transfer.transferred, + } + else: + if ctx: + await ctx.error(f"Transfer failed: {transfer.status}") + return { + "success": False, + "status": transfer.status, + "error": "Transfer failed or was rejected", + } + else: + # Return immediately + return { + "success": True, + "status": "started", + "session_id": session.session_id, + "transfer_path": transfer_path, + "message": "Transfer started. Use bt_obex_transfer_status to monitor.", + } + + except Exception as e: + if ctx: + await ctx.error(f"Send failed: {e}") + return {"success": False, "error": str(e)} + + finally: + # Clean up session (if we waited for completion) + if session and wait: + try: + await client.remove_session(session.session_id) + except Exception: + pass + + @mcp.tool() + async def bt_obex_get_vcard( + address: str, + save_path: str, + ctx: Context | None = None, + ) -> dict[str, Any]: + """Pull the default business card (vCard) from a device. + + Uses Object Push Profile to retrieve the device's default vCard. + + Args: + address: Device Bluetooth address + save_path: Local path to save the vCard file + + Returns: + Status and saved file path + """ + save_path_obj = Path(save_path).expanduser().resolve() + save_path_obj.parent.mkdir(parents=True, exist_ok=True) + + if ctx: + await ctx.info(f"Pulling vCard from {address}") + + client = await get_obex_client() + session = None + + try: + session = await client.create_session(address, "opp") + transfer_path = await client.opp_pull_business_card( + session.session_id, str(save_path_obj) + ) + + transfer = await client.wait_for_transfer(transfer_path, timeout=60) + + if transfer.status == "complete": + if ctx: + await ctx.info(f"vCard saved to {save_path_obj}") + return { + "success": True, + "status": "complete", + "file": str(save_path_obj), + "size": transfer.size, + } + else: + return { + "success": False, + "status": transfer.status, + "error": "Failed to retrieve vCard", + } + + except Exception as e: + if ctx: + await ctx.error(f"Failed to get vCard: {e}") + return {"success": False, "error": str(e)} + + finally: + if session: + try: + await client.remove_session(session.session_id) + except Exception: + pass + + # ─────────────── FTP (File Transfer) ─────────────── + + @mcp.tool() + async def bt_obex_browse( + session_id: str, + path: str = "/", + ctx: Context | None = None, + ) -> dict[str, Any]: + """List files and folders on the device. + + Requires an active FTP session. Navigate using path parameter: + - "/" for root + - ".." to go up + - "folder_name" to enter a folder + + Args: + session_id: FTP session ID from bt_obex_connect + path: Path to navigate to before listing ("/" = root, ".." = parent) + + Returns: + List of files and folders with metadata + """ + client = await get_obex_client() + session = client.get_session(session_id) + + if not session: + return {"success": False, "error": f"Unknown session: {session_id}"} + if session.target != "ftp": + return {"success": False, "error": "Session is not an FTP session"} + + try: + # Navigate if path specified + if path and path != ".": + if ctx: + await ctx.debug(f"Navigating to: {path}") + await client.ftp_change_folder(session_id, path) + + # List contents + entries = await client.ftp_list_folder(session_id) + + if ctx: + await ctx.info(f"Found {len(entries)} items") + + return { + "success": True, + "entries": [asdict(e) for e in entries], + "count": len(entries), + } + + except Exception as e: + if ctx: + await ctx.error(f"Browse failed: {e}") + return {"success": False, "error": str(e)} + + @mcp.tool() + async def bt_obex_get( + session_id: str, + remote_path: str, + local_path: str, + ctx: Context | None = None, + ) -> dict[str, Any]: + """Download a file from the device. + + Requires an active FTP session. The remote_path is relative to + the current folder (use bt_obex_browse to navigate first). + + Args: + session_id: FTP session ID + remote_path: File name on the device + local_path: Local path to save the file + + Returns: + Transfer status + """ + local_path_obj = Path(local_path).expanduser().resolve() + local_path_obj.parent.mkdir(parents=True, exist_ok=True) + + client = await get_obex_client() + session = client.get_session(session_id) + + if not session: + return {"success": False, "error": f"Unknown session: {session_id}"} + if session.target != "ftp": + return {"success": False, "error": "Session is not an FTP session"} + + if ctx: + await ctx.info(f"Downloading {remote_path}") + + try: + transfer_path = await client.ftp_get_file( + session_id, remote_path, str(local_path_obj) + ) + + transfer = await client.wait_for_transfer(transfer_path, timeout=300) + + if transfer.status == "complete": + if ctx: + await ctx.info(f"Downloaded to {local_path_obj}") + return { + "success": True, + "status": "complete", + "local_path": str(local_path_obj), + "size": transfer.size, + } + else: + return { + "success": False, + "status": transfer.status, + "error": "Download failed", + } + + except Exception as e: + if ctx: + await ctx.error(f"Download failed: {e}") + return {"success": False, "error": str(e)} + + @mcp.tool() + async def bt_obex_put( + session_id: str, + local_path: str, + remote_path: str, + ctx: Context | None = None, + ) -> dict[str, Any]: + """Upload a file to the device. + + Requires an active FTP session. The remote_path is the filename + to use in the current remote folder. + + Args: + session_id: FTP session ID + local_path: Path to local file + remote_path: Filename to use on device + + Returns: + Transfer status + """ + local_path_obj = Path(local_path).expanduser().resolve() + if not local_path_obj.exists(): + return {"success": False, "error": f"File not found: {local_path}"} + + client = await get_obex_client() + session = client.get_session(session_id) + + if not session: + return {"success": False, "error": f"Unknown session: {session_id}"} + if session.target != "ftp": + return {"success": False, "error": "Session is not an FTP session"} + + if ctx: + await ctx.info(f"Uploading {local_path_obj.name} to {remote_path}") + + try: + transfer_path = await client.ftp_put_file( + session_id, str(local_path_obj), remote_path + ) + + transfer = await client.wait_for_transfer(transfer_path, timeout=300) + + if transfer.status == "complete": + if ctx: + await ctx.info(f"Uploaded {transfer.size} bytes") + return { + "success": True, + "status": "complete", + "remote_path": remote_path, + "size": transfer.size, + } + else: + return { + "success": False, + "status": transfer.status, + "error": "Upload failed", + } + + except Exception as e: + if ctx: + await ctx.error(f"Upload failed: {e}") + return {"success": False, "error": str(e)} + + @mcp.tool() + async def bt_obex_delete( + session_id: str, + remote_path: str, + ctx: Context | None = None, + ) -> dict[str, Any]: + """Delete a file or folder on the device. + + Args: + session_id: FTP session ID + remote_path: File or folder name to delete + + Returns: + Status of the delete operation + """ + client = await get_obex_client() + session = client.get_session(session_id) + + if not session: + return {"success": False, "error": f"Unknown session: {session_id}"} + if session.target != "ftp": + return {"success": False, "error": "Session is not an FTP session"} + + if ctx: + await ctx.info(f"Deleting {remote_path}") + + try: + await client.ftp_delete(session_id, remote_path) + if ctx: + await ctx.info("Deleted successfully") + return {"success": True, "deleted": remote_path} + except Exception as e: + if ctx: + await ctx.error(f"Delete failed: {e}") + return {"success": False, "error": str(e)} + + @mcp.tool() + async def bt_obex_mkdir( + session_id: str, + folder_name: str, + ctx: Context | None = None, + ) -> dict[str, Any]: + """Create a folder on the device. + + Args: + session_id: FTP session ID + folder_name: Name of folder to create + + Returns: + Status of the mkdir operation + """ + client = await get_obex_client() + session = client.get_session(session_id) + + if not session: + return {"success": False, "error": f"Unknown session: {session_id}"} + if session.target != "ftp": + return {"success": False, "error": "Session is not an FTP session"} + + if ctx: + await ctx.info(f"Creating folder: {folder_name}") + + try: + await client.ftp_create_folder(session_id, folder_name) + if ctx: + await ctx.info("Folder created") + return {"success": True, "folder": folder_name} + except Exception as e: + if ctx: + await ctx.error(f"mkdir failed: {e}") + return {"success": False, "error": str(e)} + + # ─────────────── PBAP (Phonebook) ─────────────── + + @mcp.tool() + async def bt_phonebook_pull( + address: str, + save_path: str, + location: PbapLocation = "int", + phonebook: PbapPhonebook = "pb", + ctx: Context | None = None, + ) -> dict[str, Any]: + """Download the entire phonebook as a vCard file. + + Creates a PBAP session, downloads all contacts, and saves them. + + Args: + address: Device Bluetooth address + save_path: Local path to save the vCard file + location: "int" (internal memory) or "sim" (SIM card) + phonebook: Which phonebook to pull: + - "pb": Main phonebook/contacts + - "ich": Incoming call history + - "och": Outgoing call history + - "mch": Missed call history + - "cch": Combined call history + + Returns: + Status and file info + """ + save_path_obj = Path(save_path).expanduser().resolve() + save_path_obj.parent.mkdir(parents=True, exist_ok=True) + + if ctx: + await ctx.info(f"Pulling {phonebook} from {address}") + + client = await get_obex_client() + session = None + + try: + session = await client.create_session(address, "pbap") + if ctx: + await ctx.debug(f"PBAP session: {session.session_id}") + + # Select phonebook + await client.pbap_select(session.session_id, location, phonebook) + + # Pull all entries + transfer_path = await client.pbap_pull_all( + session.session_id, str(save_path_obj) + ) + + transfer = await client.wait_for_transfer(transfer_path, timeout=120) + + if transfer.status == "complete": + if ctx: + await ctx.info(f"Phonebook saved to {save_path_obj}") + return { + "success": True, + "status": "complete", + "file": str(save_path_obj), + "size": transfer.size, + "phonebook": phonebook, + "location": location, + } + else: + return { + "success": False, + "status": transfer.status, + "error": "Failed to pull phonebook", + } + + except Exception as e: + if ctx: + await ctx.error(f"Phonebook pull failed: {e}") + return {"success": False, "error": str(e)} + + finally: + if session: + try: + await client.remove_session(session.session_id) + except Exception: + pass + + @mcp.tool() + async def bt_phonebook_list( + address: str, + location: PbapLocation = "int", + phonebook: PbapPhonebook = "pb", + ctx: Context | None = None, + ) -> dict[str, Any]: + """List phonebook entries with handles and names. + + Use the handles with bt_phonebook_get to download individual entries. + + Args: + address: Device Bluetooth address + location: "int" (internal) or "sim" + phonebook: Which phonebook to list + + Returns: + List of phonebook entries + """ + if ctx: + await ctx.info(f"Listing {phonebook} from {address}") + + client = await get_obex_client() + session = None + + try: + session = await client.create_session(address, "pbap") + await client.pbap_select(session.session_id, location, phonebook) + + entries = await client.pbap_list(session.session_id) + + if ctx: + await ctx.info(f"Found {len(entries)} entries") + + return { + "success": True, + "entries": [asdict(e) for e in entries], + "count": len(entries), + "phonebook": phonebook, + } + + except Exception as e: + if ctx: + await ctx.error(f"List failed: {e}") + return {"success": False, "error": str(e)} + + finally: + if session: + try: + await client.remove_session(session.session_id) + except Exception: + pass + + @mcp.tool() + async def bt_phonebook_get( + address: str, + handle: str, + save_path: str, + location: PbapLocation = "int", + phonebook: PbapPhonebook = "pb", + ctx: Context | None = None, + ) -> dict[str, Any]: + """Download a single phonebook entry by handle. + + Args: + address: Device Bluetooth address + handle: Entry handle from bt_phonebook_list + save_path: Local path to save the vCard + location: "int" or "sim" + phonebook: Which phonebook + + Returns: + Status and file path + """ + save_path_obj = Path(save_path).expanduser().resolve() + save_path_obj.parent.mkdir(parents=True, exist_ok=True) + + if ctx: + await ctx.info(f"Getting entry {handle} from {address}") + + client = await get_obex_client() + session = None + + try: + session = await client.create_session(address, "pbap") + await client.pbap_select(session.session_id, location, phonebook) + + transfer_path = await client.pbap_pull( + session.session_id, handle, str(save_path_obj) + ) + + transfer = await client.wait_for_transfer(transfer_path, timeout=30) + + if transfer.status == "complete": + if ctx: + await ctx.info(f"Entry saved to {save_path_obj}") + return { + "success": True, + "file": str(save_path_obj), + "size": transfer.size, + } + else: + return {"success": False, "error": "Failed to get entry"} + + except Exception as e: + if ctx: + await ctx.error(f"Get failed: {e}") + return {"success": False, "error": str(e)} + + finally: + if session: + try: + await client.remove_session(session.session_id) + except Exception: + pass + + @mcp.tool() + async def bt_phonebook_search( + address: str, + field: PbapSearchField, + value: str, + location: PbapLocation = "int", + phonebook: PbapPhonebook = "pb", + ctx: Context | None = None, + ) -> dict[str, Any]: + """Search the phonebook. + + Args: + address: Device Bluetooth address + field: Field to search - "name", "number", or "sound" + value: Search value + location: "int" or "sim" + phonebook: Which phonebook + + Returns: + Matching entries + """ + if ctx: + await ctx.info(f"Searching {field}='{value}' in {phonebook}") + + client = await get_obex_client() + session = None + + try: + session = await client.create_session(address, "pbap") + await client.pbap_select(session.session_id, location, phonebook) + + entries = await client.pbap_search(session.session_id, field, value) + + if ctx: + await ctx.info(f"Found {len(entries)} matches") + + return { + "success": True, + "entries": [asdict(e) for e in entries], + "count": len(entries), + } + + except Exception as e: + if ctx: + await ctx.error(f"Search failed: {e}") + return {"success": False, "error": str(e)} + + finally: + if session: + try: + await client.remove_session(session.session_id) + except Exception: + pass + + @mcp.tool() + async def bt_phonebook_count( + address: str, + location: PbapLocation = "int", + phonebook: PbapPhonebook = "pb", + ctx: Context | None = None, + ) -> dict[str, Any]: + """Get the number of entries in a phonebook. + + Args: + address: Device Bluetooth address + location: "int" or "sim" + phonebook: Which phonebook + + Returns: + Count of entries + """ + if ctx: + await ctx.info(f"Counting entries in {phonebook}") + + client = await get_obex_client() + session = None + + try: + session = await client.create_session(address, "pbap") + await client.pbap_select(session.session_id, location, phonebook) + + count = await client.pbap_get_size(session.session_id) + + if ctx: + await ctx.info(f"Phonebook has {count} entries") + + return { + "success": True, + "count": count, + "phonebook": phonebook, + "location": location, + } + + except Exception as e: + if ctx: + await ctx.error(f"Count failed: {e}") + return {"success": False, "error": str(e)} + + finally: + if session: + try: + await client.remove_session(session.session_id) + except Exception: + pass + + # ─────────────── MAP (Messages) ─────────────── + + @mcp.tool() + async def bt_messages_folders( + address: str, + ctx: Context | None = None, + ) -> dict[str, Any]: + """List message folders on the device. + + Args: + address: Device Bluetooth address + + Returns: + List of message folders + """ + if ctx: + await ctx.info(f"Listing message folders from {address}") + + client = await get_obex_client() + session = None + + try: + session = await client.create_session(address, "map") + + folders = await client.map_list_folders(session.session_id) + + if ctx: + await ctx.info(f"Found {len(folders)} folders") + + return { + "success": True, + "folders": folders, + "count": len(folders), + } + + except Exception as e: + if ctx: + await ctx.error(f"List folders failed: {e}") + return {"success": False, "error": str(e)} + + finally: + if session: + try: + await client.remove_session(session.session_id) + except Exception: + pass + + @mcp.tool() + async def bt_messages_list( + address: str, + folder: str = "inbox", + unread_only: bool = False, + max_count: int = 100, + ctx: Context | None = None, + ) -> dict[str, Any]: + """List messages in a folder. + + Args: + address: Device Bluetooth address + folder: Folder name (e.g., "inbox", "sent", "draft") + unread_only: Only return unread messages + max_count: Maximum messages to return + + Returns: + List of messages with metadata + """ + if ctx: + await ctx.info(f"Listing messages in {folder}") + + client = await get_obex_client() + session = None + + try: + session = await client.create_session(address, "map") + + filters = {"MaxListCount": max_count} + if unread_only: + filters["FilterReadStatus"] = "unread" + + messages = await client.map_list_messages( + session.session_id, folder, filters + ) + + if ctx: + await ctx.info(f"Found {len(messages)} messages") + + return { + "success": True, + "messages": [asdict(m) for m in messages], + "count": len(messages), + "folder": folder, + } + + except Exception as e: + if ctx: + await ctx.error(f"List messages failed: {e}") + return {"success": False, "error": str(e)} + + finally: + if session: + try: + await client.remove_session(session.session_id) + except Exception: + pass + + @mcp.tool() + async def bt_messages_get( + address: str, + handle: str, + save_path: str, + include_attachment: bool = False, + ctx: Context | None = None, + ) -> dict[str, Any]: + """Download a specific message by handle. + + Args: + address: Device Bluetooth address + handle: Message handle from bt_messages_list + save_path: Local path to save the message + include_attachment: Include attachments in download + + Returns: + Status and file path + """ + save_path_obj = Path(save_path).expanduser().resolve() + save_path_obj.parent.mkdir(parents=True, exist_ok=True) + + if ctx: + await ctx.info(f"Getting message {handle}") + + client = await get_obex_client() + session = None + + try: + session = await client.create_session(address, "map") + + transfer_path = await client.map_get_message( + session.session_id, handle, str(save_path_obj), include_attachment + ) + + transfer = await client.wait_for_transfer(transfer_path, timeout=60) + + if transfer.status == "complete": + if ctx: + await ctx.info(f"Message saved to {save_path_obj}") + return { + "success": True, + "file": str(save_path_obj), + "size": transfer.size, + } + else: + return {"success": False, "error": "Failed to get message"} + + except Exception as e: + if ctx: + await ctx.error(f"Get message failed: {e}") + return {"success": False, "error": str(e)} + + finally: + if session: + try: + await client.remove_session(session.session_id) + except Exception: + pass + + @mcp.tool() + async def bt_messages_send( + address: str, + recipient: str, + message: str, + ctx: Context | None = None, + ) -> dict[str, Any]: + """Send an SMS message via MAP. + + Note: This requires the phone to support MAP message pushing. + Not all devices support this feature. + + Args: + address: Device Bluetooth address + recipient: Phone number to send to + message: Message text + + Returns: + Status of the send operation + """ + import tempfile + + if ctx: + await ctx.info(f"Sending message to {recipient}") + + client = await get_obex_client() + session = None + + try: + session = await client.create_session(address, "map") + + # Create bMessage format + bmessage = f"""BEGIN:BMSG +VERSION:1.0 +STATUS:UNREAD +TYPE:SMS_GSM +FOLDER: +BEGIN:VCARD +VERSION:2.1 +TEL:{recipient} +END:VCARD +BEGIN:BENV +BEGIN:BBODY +CHARSET:UTF-8 +LENGTH:{len(message.encode('utf-8'))} +BEGIN:MSG +{message} +END:MSG +END:BBODY +END:BENV +END:BMSG +""" + + # Write to temp file + with tempfile.NamedTemporaryFile( + mode="w", suffix=".bmsg", delete=False + ) as f: + f.write(bmessage) + msg_path = f.name + + try: + handle = await client.map_push_message( + session.session_id, "outbox", msg_path + ) + + if ctx: + await ctx.info(f"Message sent, handle: {handle}") + + return { + "success": True, + "handle": handle, + "recipient": recipient, + } + + finally: + # Clean up temp file + Path(msg_path).unlink(missing_ok=True) + + except Exception as e: + error_msg = str(e) + if ctx: + await ctx.error(f"Send failed: {e}") + + hints = [] + if "NotSupported" in error_msg: + hints.append("This device may not support sending messages via MAP") + + return {"success": False, "error": error_msg, "hints": hints} + + finally: + if session: + try: + await client.remove_session(session.session_id) + except Exception: + pass + + # ─────────────── Transfer Management ─────────────── + + @mcp.tool() + async def bt_obex_transfer_status( + transfer_path: str, + ctx: Context | None = None, + ) -> dict[str, Any]: + """Check the status of an OBEX transfer. + + Args: + transfer_path: Transfer path from a transfer operation + + Returns: + Transfer status and progress + """ + try: + client = await get_obex_client() + transfer = await client.get_transfer_status(transfer_path) + + progress = 0 + if transfer.size > 0: + progress = int((transfer.transferred / transfer.size) * 100) + + if ctx: + await ctx.info(f"Transfer: {transfer.status} ({progress}%)") + + return { + "success": True, + "status": transfer.status, + "name": transfer.name, + "size": transfer.size, + "transferred": transfer.transferred, + "progress_percent": progress, + } + + except Exception as e: + return {"success": False, "error": str(e)} + + @mcp.tool() + async def bt_obex_transfer_cancel( + transfer_path: str, + ctx: Context | None = None, + ) -> dict[str, Any]: + """Cancel an active OBEX transfer. + + Args: + transfer_path: Transfer path to cancel + + Returns: + Status of cancellation + """ + try: + client = await get_obex_client() + await client.cancel_transfer(transfer_path) + + if ctx: + await ctx.info("Transfer cancelled") + + return {"success": True, "cancelled": transfer_path} + + except Exception as e: + if ctx: + await ctx.error(f"Cancel failed: {e}") + return {"success": False, "error": str(e)}