diff --git a/README.md b/README.md index abe851e..9dbe0bd 100644 --- a/README.md +++ b/README.md @@ -130,14 +130,88 @@ cd examples/fractal-agent-coordination/ ### 🏃‍♂️ FastMCP MQTT Tools +**Core MQTT Tools:** - `mqtt_connect` - Connect to MQTT brokers -- `mqtt_publish` - Publish messages with QoS support +- `mqtt_publish` - Publish messages with QoS support - `mqtt_subscribe` - Subscribe to topics with wildcards - `mqtt_get_messages` - Retrieve received messages - `mqtt_status` - Get connection and statistics - `mqtt_spawn_broker` - Create embedded brokers instantly - `mqtt_list_brokers` - Manage multiple brokers +**Agent Coordination Tools (NEW!):** +- `mqtt_host_conversation` - **Host** a coordinated agent-to-agent conversation +- `mqtt_join_conversation` - **Join** a conversation hosted by another agent +- `mqtt_conversation_status` - Get status of active conversations +- `mqtt_list_conversations` - List all active conversations + +### 🤝 Agent-to-Agent Coordination Protocol + +**Solving the "Ships Passing in the Night" Problem** + +When two agents try to communicate via MQTT, a common issue occurs: Agent A publishes a message before Agent B has subscribed, resulting in dropped messages. mcmqtt's coordination protocol solves this with a **host/join handshake**: + +``` +┌─────────────────┐ ┌─────────────────┐ +│ Agent A │ │ Agent B │ +│ (Initiator) │ │ (Invited) │ +└────────┬────────┘ └────────┬────────┘ + │ │ + │ 1. mqtt_host_conversation() │ + │ - Spawns broker │ + │ - Waits for Agent B │ + │ ────────────────────► │ + │ │ + │ 2. mqtt_join_conversation() + │ - Connects to broker + │ - Signals ready + │ ◄──────────────────── + │ │ + │ 3. Host acknowledges join │ + │ ────────────────────► │ + │ │ + │ 4. All ready - safe to publish! │ + │ ◄───────────────────► │ + └───────────────────────────────────────┘ +``` + +**Usage Example:** + +```python +# Agent A (initiating the conversation): +result = await mqtt_host_conversation( + session_id="task-collaboration-123", + host_agent_id="coordinator", + expected_agents=["worker-1", "worker-2"], + timeout_seconds=30 +) +# Returns only when ALL expected agents have joined and subscribed! + +if result["ready_to_publish"]: + await mqtt_publish(topic=result["conversation_topic"], payload="Hello team!") +``` + +```python +# Agent B (joining the conversation): +result = await mqtt_join_conversation( + session_id="task-collaboration-123", + agent_id="worker-1", + broker_host="127.0.0.1", + broker_port=1883 +) +# Returns only after host acknowledges and all agents are ready! + +if result["ready_to_receive"]: + # Now safe to receive messages - guaranteed not to miss any! + messages = await mqtt_get_messages(topic="conversation/task-collaboration-123/main") +``` + +**Key Guarantees:** +- **No dropped messages** - Host waits for all agents to subscribe before publishing +- **Clear ownership** - Initiating agent always hosts the broker +- **Timeout protection** - Configurable timeout prevents infinite waits +- **State visibility** - Check conversation status at any time + ### 🔧 Embedded Broker Management **MCP clients can spawn MQTT brokers on-demand using the `mqtt_spawn_broker` tool:** @@ -227,12 +301,13 @@ This isn't your typical monolithic MQTT library. mcmqtt features a **clean modul ``` mcmqtt/ ├── cli/ # Command-line interface & argument parsing -├── config/ # Environment & configuration management +├── config/ # Environment & configuration management ├── logging/ # Structured logging setup ├── server/ # STDIO & HTTP server runners ├── mqtt/ # Core MQTT client functionality ├── mcp/ # FastMCP server integration ├── broker/ # Embedded broker management +├── coordination/ # Agent-to-agent handshake protocol (NEW!) └── middleware/ # Broker middleware & orchestration ``` diff --git a/docs-site/astro.config.mjs b/docs-site/astro.config.mjs new file mode 100644 index 0000000..4b1a0c9 --- /dev/null +++ b/docs-site/astro.config.mjs @@ -0,0 +1,210 @@ +import { defineConfig } from 'astro/config'; +import starlight from '@astrojs/starlight'; +import alpinejs from '@astrojs/alpinejs'; + +export default defineConfig({ + site: 'https://mcmqtt.dev', + base: '/', + trailingSlash: 'ignore', + build: { + format: 'directory', + }, + integrations: [ + alpinejs(), + starlight({ + title: 'mcmqtt - AI Coordination Platform', + description: 'The definitive platform for AI coordination with revolutionary fractal agent swarms. Deploy sophisticated multi-agent systems with zero-config infrastructure and production-grade safety.', + logo: { + src: './src/assets/mcmqtt-logo.svg', + replacesTitle: true, + }, + social: { + github: 'https://git.supported.systems/MCP/mcmqtt', + discord: 'https://discord.gg/mcmqtt', + twitter: 'https://twitter.com/mcmqtt', + }, + editLink: { + baseUrl: 'https://git.supported.systems/MCP/mcmqtt/edit/main/docs-site/', + }, + lastUpdated: true, + customCss: [ + './src/styles/custom.css', + ], + components: { + Head: './src/components/Head.astro', + Hero: './src/components/Hero.astro', + }, + favicon: '/favicon.svg', + sidebar: [ + { + label: 'Start Here', + items: [ + { label: 'Quick Start', link: '/guides/quick-start/' }, + { label: 'Installation', link: '/guides/installation/' }, + { label: 'Configuration', link: '/guides/configuration/' }, + ], + }, + { + label: 'Fractal Agent Coordination', + badge: 'New', + items: [ + { label: 'Overview', link: '/coordination/overview/' }, + { label: 'Agent Handshake Protocol', link: '/coordination/agent-handshake/', badge: 'New' }, + { label: 'Agent Swarms', link: '/coordination/swarms/' }, + { label: 'Browser Testing', link: '/coordination/browser-testing/' }, + { label: 'API Monitoring', link: '/coordination/api-monitoring/' }, + { label: 'Security Analysis', link: '/coordination/security/' }, + { label: 'Safety & Isolation', link: '/coordination/safety/' }, + ], + }, + { + label: 'Tutorials', + items: [ + { label: 'Your First MQTT Connection', link: '/tutorials/first-connection/' }, + { label: 'Building Agent Networks', link: '/tutorials/agent-networks/' }, + { label: 'Real-time Data Streams', link: '/tutorials/data-streams/' }, + { label: 'Production Deployment', link: '/tutorials/production/' }, + ], + }, + { + label: 'How-to Guides', + items: [ + { label: 'Spawn Dynamic Brokers', link: '/how-to/dynamic-brokers/' }, + { label: 'Implement Custom Tools', link: '/how-to/custom-tools/' }, + { label: 'Monitor Agent Health', link: '/how-to/monitoring/' }, + { label: 'Scale Horizontally', link: '/how-to/scaling/' }, + { label: 'Debug Connections', link: '/how-to/debugging/' }, + ], + }, + { + label: 'Core Features', + items: [ + { label: 'MQTT Integration', link: '/features/mqtt/' }, + { label: 'FastMCP Server', link: '/features/fastmcp/' }, + { label: 'Embedded Brokers', link: '/features/brokers/' }, + { label: 'Real-time Messaging', link: '/features/messaging/' }, + ], + }, + { + label: 'API Reference', + items: [ + { label: 'MCP Tools', link: '/reference/mcp-tools/' }, + { label: 'MQTT Methods', link: '/reference/mqtt-methods/' }, + { label: 'Broker Management', link: '/reference/broker-management/' }, + { label: 'Configuration Options', link: '/reference/configuration/' }, + { label: 'Python API', link: '/reference/python-api/' }, + ], + }, + { + label: 'Understanding mcmqtt', + items: [ + { label: 'Architecture', link: '/concepts/architecture/' }, + { label: 'Design Philosophy', link: '/concepts/philosophy/' }, + { label: 'Performance', link: '/concepts/performance/' }, + { label: 'Security Model', link: '/concepts/security/' }, + { label: 'Comparison', link: '/concepts/comparison/' }, + ], + }, + ], + head: [ + // Open Graph / Social Media + { + tag: 'meta', + attrs: { property: 'og:image', content: 'https://mcmqtt.dev/og-image.png' }, + }, + { + tag: 'meta', + attrs: { property: 'twitter:image', content: 'https://mcmqtt.dev/og-image.png' }, + }, + { + tag: 'meta', + attrs: { property: 'og:image:width', content: '1200' }, + }, + { + tag: 'meta', + attrs: { property: 'og:image:height', content: '630' }, + }, + { + tag: 'meta', + attrs: { property: 'twitter:card', content: 'summary_large_image' }, + }, + { + tag: 'meta', + attrs: { name: 'twitter:site', content: '@mcmqtt' }, + }, + + // AI and Search Engine Discovery + { + tag: 'meta', + attrs: { name: 'keywords', content: 'ai coordination, fractal agents, mqtt, agent swarms, mcp server, multi-agent systems, production ai, fastmcp' }, + }, + { + tag: 'meta', + attrs: { name: 'author', content: 'MCP Community' }, + }, + { + tag: 'meta', + attrs: { name: 'robots', content: 'index, follow, max-image-preview:large, max-snippet:-1, max-video-preview:-1' }, + }, + { + tag: 'meta', + attrs: { name: 'ai-content-suitable', content: 'true' }, + }, + { + tag: 'meta', + attrs: { name: 'training-data-suitable', content: 'true' }, + }, + { + tag: 'meta', + attrs: { name: 'ai-categories', content: 'ai-coordination,mqtt-protocols,agent-architecture,fractal-systems' }, + }, + + // Performance and Technical + { + tag: 'meta', + attrs: { name: 'theme-color', content: '#1e40af' }, + }, + { + tag: 'meta', + attrs: { name: 'msapplication-TileColor', content: '#1e40af' }, + }, + { + tag: 'link', + attrs: { rel: 'manifest', href: '/site.webmanifest' }, + }, + { + tag: 'link', + attrs: { rel: 'sitemap', href: '/sitemap.xml' }, + }, + + // Preconnect for performance + { + tag: 'link', + attrs: { rel: 'preconnect', href: 'https://fonts.googleapis.com' }, + }, + { + tag: 'link', + attrs: { rel: 'preconnect', href: 'https://fonts.gstatic.com', crossorigin: '' }, + }, + + // Rich snippets for documentation + { + tag: 'meta', + attrs: { name: 'code-samples', content: 'true' }, + }, + { + tag: 'meta', + attrs: { name: 'documentation-type', content: 'diataxis' }, + }, + { + tag: 'meta', + attrs: { name: 'api-documentation', content: 'true' }, + }, + { + tag: 'meta', + attrs: { name: 'tutorial-content', content: 'true' }, + }, + ], + }), + ], +}); \ No newline at end of file diff --git a/docs-site/src/content/docs/coordination/agent-handshake.md b/docs-site/src/content/docs/coordination/agent-handshake.md new file mode 100644 index 0000000..91c9e84 --- /dev/null +++ b/docs-site/src/content/docs/coordination/agent-handshake.md @@ -0,0 +1,233 @@ +--- +title: "Agent-to-Agent Handshake Protocol" +description: "Solving the 'ships passing in the night' problem with coordinated agent communication" +--- + +## The Problem: Dropped Messages + +When two AI agents try to communicate via MQTT, a common failure pattern emerges: + +1. Agent A connects to broker +2. Agent A publishes "Hello, Agent B!" +3. Agent B connects to broker +4. Agent B subscribes to the topic +5. **Agent B never receives the message** (it was published before subscription) + +This is the **"ships passing in the night"** problem - agents miss messages because they publish before the other party has subscribed. + +## The Solution: Host/Join Handshake + +mcmqtt's coordination protocol ensures **no messages are dropped** by implementing a simple but powerful handshake: + +``` + HOST JOINER + │ │ + │ 1. Spawn/connect to broker │ + │ 2. Subscribe to $coord/join │ + │ 3. Publish broker_ready │ + │─────────────────────────────────►│ + │ │ + │ 4. Connect to broker │ + │ 5. Subscribe to topics │ + │ 6. Publish join request │ + │◄─────────────────────────────────│ + │ │ + │ 7. Acknowledge join │ + │─────────────────────────────────►│ + │ │ + │ 8. Publish all_ready signal │ + │─────────────────────────────────►│ + │ │ + ▼ SAFE TO EXCHANGE MESSAGES! ▼ +``` + +## Key Principle + +**The initiating agent ALWAYS hosts.** This eliminates confusion about who spawns the broker. + +## MCP Tools + +### mqtt_host_conversation + +Use this when **you** are starting a conversation with other agents. + +```json +{ + "tool": "mqtt_host_conversation", + "arguments": { + "session_id": "collab-task-123", + "host_agent_id": "coordinator-agent", + "expected_agents": ["worker-1", "worker-2", "analyst"], + "broker_host": "127.0.0.1", + "broker_port": 0, + "timeout_seconds": 30 + } +} +``` + +**Parameters:** + +| Parameter | Type | Description | +|-----------|------|-------------| +| `session_id` | string | Unique identifier for this conversation | +| `host_agent_id` | string | Your agent's unique ID | +| `expected_agents` | array | List of agent IDs that must join | +| `broker_host` | string | Host to bind broker (default: 127.0.0.1) | +| `broker_port` | int | Port (0 = auto-assign) | +| `timeout_seconds` | float | Max wait time for agents to join | + +**Response (success):** + +```json +{ + "success": true, + "message": "Conversation ready! All 3 agents joined.", + "session_id": "collab-task-123", + "state": "ready", + "broker_host": "127.0.0.1", + "broker_port": 51234, + "broker_url": "mqtt://127.0.0.1:51234", + "joined_agents": ["worker-1", "worker-2", "analyst"], + "conversation_topic": "conversation/collab-task-123/main", + "ready_to_publish": true +} +``` + +### mqtt_join_conversation + +Use this when **another agent** invited you to a conversation. + +```json +{ + "tool": "mqtt_join_conversation", + "arguments": { + "session_id": "collab-task-123", + "agent_id": "worker-1", + "broker_host": "127.0.0.1", + "broker_port": 51234, + "capabilities": ["data-analysis", "visualization"], + "timeout_seconds": 30 + } +} +``` + +**Parameters:** + +| Parameter | Type | Description | +|-----------|------|-------------| +| `session_id` | string | Session ID from host's invitation | +| `agent_id` | string | Your unique agent ID | +| `broker_host` | string | Broker host (from host's invitation) | +| `broker_port` | int | Broker port (from host's invitation) | +| `capabilities` | array | Optional list of your capabilities | +| `timeout_seconds` | float | Max wait for acknowledgement | + +**Response (success):** + +```json +{ + "success": true, + "message": "Successfully joined conversation collab-task-123!", + "session_id": "collab-task-123", + "agent_id": "worker-1", + "broker_host": "127.0.0.1", + "broker_port": 51234, + "other_agents": ["coordinator-agent", "worker-2", "analyst"], + "conversation_topic": "conversation/collab-task-123/main", + "ready_to_receive": true +} +``` + +## Example: Two-Agent Collaboration + +### Agent A (Initiator/Host) + +```python +# Step 1: Host the conversation +result = await mqtt_host_conversation( + session_id="data-analysis-job", + host_agent_id="data-processor", + expected_agents=["visualizer"], + timeout_seconds=30 +) + +if result["ready_to_publish"]: + # Step 2: Safe to publish - visualizer is definitely subscribed! + await mqtt_publish( + topic=result["conversation_topic"], + payload={"type": "data", "values": [1, 2, 3, 4, 5]} + ) +``` + +### Agent B (Joiner) + +```python +# Step 1: Join using info from Agent A +result = await mqtt_join_conversation( + session_id="data-analysis-job", + agent_id="visualizer", + broker_host="127.0.0.1", + broker_port=51234 +) + +if result["ready_to_receive"]: + # Step 2: Now receive messages - guaranteed not to miss any! + messages = await mqtt_get_messages( + topic=result["conversation_topic"] + ) +``` + +## Topic Structure + +The protocol uses reserved topics under `$coordination/`: + +``` +$coordination/{session_id}/ +├── broker_ready # Host publishes broker info (retained) +├── join # Agents publish join requests +├── joined/{agent_id} # Host acknowledges each agent (retained) +├── ready # Host signals all agents ready (retained) +└── heartbeat/{agent_id} # Optional: agent heartbeats +``` + +After handshake, conversations use: + +``` +conversation/{session_id}/ +├── main # Primary conversation channel +├── {channel_name} # Additional named channels +└── ... +``` + +## Timeout Handling + +If expected agents don't join within the timeout: + +```json +{ + "success": false, + "message": "Timeout waiting for agents. Missing: ['worker-2']", + "session_id": "collab-task-123", + "state": "timeout", + "joined_agents": ["worker-1", "analyst"], + "missing_agents": ["worker-2"], + "ready_to_publish": false +} +``` + +The host can then decide whether to: +- Retry with a longer timeout +- Proceed with available agents +- Abort the conversation + +## Best Practices + +1. **Always use coordination tools for multi-agent work** - Don't use raw `mqtt_connect` + `mqtt_publish` when coordinating with other agents + +2. **Choose meaningful session IDs** - Include context like `task-{id}-{timestamp}` for debugging + +3. **Set appropriate timeouts** - Network latency and agent startup time vary; 30 seconds is a safe default + +4. **Check the response** - Always verify `ready_to_publish` (host) or `ready_to_receive` (joiner) before proceeding + +5. **Handle failures gracefully** - Timeout doesn't mean failure; retry logic is your friend diff --git a/src/mcmqtt/coordination/__init__.py b/src/mcmqtt/coordination/__init__.py new file mode 100644 index 0000000..5fa569a --- /dev/null +++ b/src/mcmqtt/coordination/__init__.py @@ -0,0 +1,43 @@ +"""Agent-to-agent coordination module for mcmqtt. + +This module implements the host/join handshake protocol that ensures +agents don't experience "ships passing in the night" - dropped messages +because one agent published before the other subscribed. + +The pattern is simple: +1. Initiating agent ALWAYS hosts (spawns broker, waits for joiners) +2. Joining agents connect and signal readiness +3. Only after all expected agents join does message exchange begin + +Example: + # Agent A (initiator/host): + result = await mqtt_host_conversation( + session_id="collab-123", + expected_agents=["agent-b", "agent-c"], + timeout_seconds=30 + ) + # Returns broker connection info and confirms all agents joined + + # Agent B (joiner): + result = await mqtt_join_conversation( + session_id="collab-123", + agent_id="agent-b", + broker_host="localhost", + broker_port=1883 + ) + # Returns when successfully joined and host acknowledged +""" + +from .protocol import ( + ConversationHost, + ConversationJoiner, + HandshakeProtocol, + ConversationState, +) + +__all__ = [ + "ConversationHost", + "ConversationJoiner", + "HandshakeProtocol", + "ConversationState", +] diff --git a/src/mcmqtt/coordination/protocol.py b/src/mcmqtt/coordination/protocol.py new file mode 100644 index 0000000..66c5118 --- /dev/null +++ b/src/mcmqtt/coordination/protocol.py @@ -0,0 +1,542 @@ +"""Handshake protocol for agent-to-agent coordination. + +This module solves the "ships passing in the night" problem where agents +publish messages before other agents have subscribed, resulting in dropped +messages. + +The solution is a simple handshake: +1. HOST spawns/owns the broker and waits for joiners +2. JOINERs connect and signal they're ready +3. HOST acknowledges all joiners before conversation begins +""" + +import asyncio +import json +import logging +from dataclasses import dataclass, field +from datetime import datetime +from enum import Enum +from typing import Dict, List, Optional, Set, Any, Callable + +logger = logging.getLogger(__name__) + + +class ConversationState(Enum): + """State of a coordinated conversation.""" + INITIALIZING = "initializing" # Host is setting up + WAITING_FOR_AGENTS = "waiting" # Host waiting for agents to join + READY = "ready" # All agents joined, conversation active + CLOSED = "closed" # Conversation ended + TIMEOUT = "timeout" # Timed out waiting for agents + ERROR = "error" # Error occurred + + +@dataclass +class AgentInfo: + """Information about a joined agent.""" + agent_id: str + joined_at: datetime + capabilities: List[str] = field(default_factory=list) + metadata: Dict[str, Any] = field(default_factory=dict) + + +@dataclass +class ConversationSession: + """Represents an active conversation session.""" + session_id: str + host_agent_id: str + state: ConversationState + broker_host: str + broker_port: int + created_at: datetime + expected_agents: List[str] + joined_agents: Dict[str, AgentInfo] = field(default_factory=dict) + topics: Dict[str, str] = field(default_factory=dict) # name -> full topic + + @property + def all_agents_joined(self) -> bool: + """Check if all expected agents have joined.""" + return all( + agent_id in self.joined_agents + for agent_id in self.expected_agents + ) + + @property + def missing_agents(self) -> List[str]: + """Get list of agents that haven't joined yet.""" + return [ + agent_id for agent_id in self.expected_agents + if agent_id not in self.joined_agents + ] + + +class HandshakeProtocol: + """ + Defines the MQTT topic structure for coordination handshakes. + + Topic Structure: + $coordination/{session_id}/ + ├── broker_ready # Host publishes broker info here + ├── join # Agents publish join requests here + ├── joined/{agent_id} # Host acknowledges each agent + ├── ready # Host signals all agents ready + └── heartbeat/{agent_id} # Agents send periodic heartbeats + """ + + PREFIX = "$coordination" + + @classmethod + def broker_ready_topic(cls, session_id: str) -> str: + """Topic where host publishes broker ready signal.""" + return f"{cls.PREFIX}/{session_id}/broker_ready" + + @classmethod + def join_topic(cls, session_id: str) -> str: + """Topic where agents publish join requests.""" + return f"{cls.PREFIX}/{session_id}/join" + + @classmethod + def joined_topic(cls, session_id: str, agent_id: str) -> str: + """Topic where host acknowledges a specific agent.""" + return f"{cls.PREFIX}/{session_id}/joined/{agent_id}" + + @classmethod + def all_ready_topic(cls, session_id: str) -> str: + """Topic where host signals all agents are ready.""" + return f"{cls.PREFIX}/{session_id}/ready" + + @classmethod + def heartbeat_topic(cls, session_id: str, agent_id: str) -> str: + """Topic for agent heartbeats.""" + return f"{cls.PREFIX}/{session_id}/heartbeat/{agent_id}" + + @classmethod + def conversation_topic(cls, session_id: str, channel: str = "main") -> str: + """Topic for actual conversation messages (after handshake).""" + return f"conversation/{session_id}/{channel}" + + +class ConversationHost: + """ + Manages hosting a conversation - the initiating agent's side. + + The host: + 1. Spawns or uses an existing broker + 2. Publishes broker_ready message + 3. Subscribes to join topic and waits for expected agents + 4. Acknowledges each joining agent + 5. Signals all_ready when everyone has joined + + Usage: + host = ConversationHost( + mqtt_client=client, + broker_manager=manager, + session_id="collab-123", + host_agent_id="coordinator", + expected_agents=["worker-1", "worker-2"] + ) + session = await host.start_and_wait(timeout=30) + if session.state == ConversationState.READY: + # All agents joined - safe to publish messages + ... + """ + + def __init__( + self, + mqtt_client: Any, + broker_manager: Any, + session_id: str, + host_agent_id: str, + expected_agents: List[str], + broker_host: str = "127.0.0.1", + broker_port: int = 0, # 0 = auto-assign + ): + self.mqtt_client = mqtt_client + self.broker_manager = broker_manager + self.session_id = session_id + self.host_agent_id = host_agent_id + self.expected_agents = expected_agents + self.broker_host = broker_host + self.broker_port = broker_port + + self._session: Optional[ConversationSession] = None + self._join_event = asyncio.Event() + self._joined_agents: Dict[str, AgentInfo] = {} + + async def start_and_wait(self, timeout: float = 30.0) -> ConversationSession: + """ + Start hosting and wait for all expected agents to join. + + Args: + timeout: Maximum seconds to wait for all agents + + Returns: + ConversationSession with state indicating success/failure + """ + try: + # Step 1: Spawn broker if needed + actual_port = await self._ensure_broker() + + # Create session + self._session = ConversationSession( + session_id=self.session_id, + host_agent_id=self.host_agent_id, + state=ConversationState.INITIALIZING, + broker_host=self.broker_host, + broker_port=actual_port, + created_at=datetime.now(), + expected_agents=self.expected_agents, + ) + + # Step 2: Subscribe to join topic + join_topic = HandshakeProtocol.join_topic(self.session_id) + await self._subscribe_for_joins(join_topic) + + # Step 3: Publish broker_ready + await self._publish_broker_ready() + self._session.state = ConversationState.WAITING_FOR_AGENTS + + logger.info( + f"[HOST] Session {self.session_id} waiting for agents: " + f"{self.expected_agents}" + ) + + # Step 4: Wait for all agents with timeout + try: + await asyncio.wait_for( + self._wait_for_all_agents(), + timeout=timeout + ) + + # All agents joined! + self._session.state = ConversationState.READY + self._session.joined_agents = self._joined_agents.copy() + + # Signal all ready + await self._publish_all_ready() + + logger.info( + f"[HOST] Session {self.session_id} READY with agents: " + f"{list(self._joined_agents.keys())}" + ) + + except asyncio.TimeoutError: + self._session.state = ConversationState.TIMEOUT + self._session.joined_agents = self._joined_agents.copy() + logger.warning( + f"[HOST] Timeout waiting for agents. " + f"Missing: {self._session.missing_agents}" + ) + + return self._session + + except Exception as e: + if self._session: + self._session.state = ConversationState.ERROR + logger.error(f"[HOST] Error starting conversation: {e}") + raise + + async def _ensure_broker(self) -> int: + """Ensure broker is running, spawn if needed. Returns actual port.""" + if self.broker_port > 0: + # Using existing broker at specified port + return self.broker_port + + # Spawn new broker + from ..broker import BrokerConfig + + config = BrokerConfig( + port=0, # Auto-assign + host=self.broker_host, + name=f"conversation-{self.session_id}", + ) + + broker_id = await self.broker_manager.spawn_broker(config) + broker_info = await self.broker_manager.get_broker_status(broker_id) + + actual_port = broker_info.config.port + logger.info(f"[HOST] Spawned broker on port {actual_port}") + + return actual_port + + async def _subscribe_for_joins(self, topic: str): + """Subscribe to join topic and handle incoming join requests.""" + + def handle_join(message): + """Handle join request from an agent.""" + try: + payload = json.loads(message.payload_str) + agent_id = payload.get("agent_id") + + if agent_id and agent_id in self.expected_agents: + self._joined_agents[agent_id] = AgentInfo( + agent_id=agent_id, + joined_at=datetime.now(), + capabilities=payload.get("capabilities", []), + metadata=payload.get("metadata", {}), + ) + + logger.info(f"[HOST] Agent joined: {agent_id}") + + # Acknowledge the join + asyncio.create_task(self._acknowledge_agent(agent_id)) + + # Check if all agents have joined + if len(self._joined_agents) >= len(self.expected_agents): + self._join_event.set() + + except Exception as e: + logger.error(f"[HOST] Error handling join: {e}") + + # Register handler and subscribe + if hasattr(self.mqtt_client, 'add_message_handler'): + self.mqtt_client.add_message_handler(topic, handle_join) + + from ..mqtt.types import MQTTQoS + await self.mqtt_client.subscribe(topic, MQTTQoS.AT_LEAST_ONCE) + + async def _publish_broker_ready(self): + """Publish broker ready signal with connection info.""" + topic = HandshakeProtocol.broker_ready_topic(self.session_id) + payload = json.dumps({ + "session_id": self.session_id, + "host_agent_id": self.host_agent_id, + "broker_host": self.broker_host, + "broker_port": self._session.broker_port, + "expected_agents": self.expected_agents, + "timestamp": datetime.now().isoformat(), + }) + + from ..mqtt.types import MQTTQoS + await self.mqtt_client.publish( + topic=topic, + payload=payload, + qos=MQTTQoS.AT_LEAST_ONCE, + retain=True # Retain so late joiners see it + ) + + async def _acknowledge_agent(self, agent_id: str): + """Acknowledge a specific agent's join.""" + topic = HandshakeProtocol.joined_topic(self.session_id, agent_id) + payload = json.dumps({ + "session_id": self.session_id, + "agent_id": agent_id, + "acknowledged": True, + "timestamp": datetime.now().isoformat(), + }) + + from ..mqtt.types import MQTTQoS + await self.mqtt_client.publish( + topic=topic, + payload=payload, + qos=MQTTQoS.AT_LEAST_ONCE, + retain=True + ) + + async def _wait_for_all_agents(self): + """Wait until all expected agents have joined.""" + while len(self._joined_agents) < len(self.expected_agents): + await asyncio.sleep(0.1) + if self._join_event.is_set(): + break + + async def _publish_all_ready(self): + """Publish signal that all agents are ready.""" + topic = HandshakeProtocol.all_ready_topic(self.session_id) + payload = json.dumps({ + "session_id": self.session_id, + "state": "ready", + "agents": list(self._joined_agents.keys()), + "timestamp": datetime.now().isoformat(), + }) + + from ..mqtt.types import MQTTQoS + await self.mqtt_client.publish( + topic=topic, + payload=payload, + qos=MQTTQoS.AT_LEAST_ONCE, + retain=True + ) + + +class ConversationJoiner: + """ + Manages joining a conversation - the invited agent's side. + + The joiner: + 1. Connects to the broker (using info from host) + 2. Subscribes to acknowledgement topic + 3. Publishes join request + 4. Waits for host acknowledgement + 5. Waits for all_ready signal + + Usage: + joiner = ConversationJoiner( + mqtt_client=client, + session_id="collab-123", + agent_id="worker-1", + broker_host="localhost", + broker_port=1883 + ) + result = await joiner.join_and_wait(timeout=30) + if result["success"]: + # Successfully joined - safe to exchange messages + ... + """ + + def __init__( + self, + mqtt_client: Any, + session_id: str, + agent_id: str, + broker_host: str, + broker_port: int, + capabilities: Optional[List[str]] = None, + metadata: Optional[Dict[str, Any]] = None, + ): + self.mqtt_client = mqtt_client + self.session_id = session_id + self.agent_id = agent_id + self.broker_host = broker_host + self.broker_port = broker_port + self.capabilities = capabilities or [] + self.metadata = metadata or {} + + self._acknowledged = asyncio.Event() + self._all_ready = asyncio.Event() + self._other_agents: List[str] = [] + + async def join_and_wait(self, timeout: float = 30.0) -> Dict[str, Any]: + """ + Join the conversation and wait for acknowledgement. + + Args: + timeout: Maximum seconds to wait for acknowledgement + + Returns: + Dict with success status and session info + """ + try: + # Step 1: Subscribe to our acknowledgement topic + ack_topic = HandshakeProtocol.joined_topic(self.session_id, self.agent_id) + await self._subscribe_for_acknowledgement(ack_topic) + + # Step 2: Subscribe to all_ready topic + ready_topic = HandshakeProtocol.all_ready_topic(self.session_id) + await self._subscribe_for_ready(ready_topic) + + # Step 3: Publish join request + await self._publish_join_request() + + logger.info( + f"[JOIN] Agent {self.agent_id} joining session {self.session_id}" + ) + + # Step 4: Wait for acknowledgement and all_ready + try: + await asyncio.wait_for( + self._wait_for_ready(), + timeout=timeout + ) + + logger.info( + f"[JOIN] Agent {self.agent_id} successfully joined! " + f"Other agents: {self._other_agents}" + ) + + return { + "success": True, + "session_id": self.session_id, + "agent_id": self.agent_id, + "broker_host": self.broker_host, + "broker_port": self.broker_port, + "other_agents": self._other_agents, + "conversation_topic": HandshakeProtocol.conversation_topic( + self.session_id + ), + } + + except asyncio.TimeoutError: + logger.warning( + f"[JOIN] Timeout waiting for session {self.session_id}" + ) + return { + "success": False, + "error": "timeout", + "message": "Timed out waiting for host acknowledgement", + "session_id": self.session_id, + "agent_id": self.agent_id, + } + + except Exception as e: + logger.error(f"[JOIN] Error joining conversation: {e}") + return { + "success": False, + "error": "exception", + "message": str(e), + "session_id": self.session_id, + "agent_id": self.agent_id, + } + + async def _subscribe_for_acknowledgement(self, topic: str): + """Subscribe to our acknowledgement topic.""" + + def handle_ack(message): + try: + payload = json.loads(message.payload_str) + if payload.get("acknowledged") and payload.get("agent_id") == self.agent_id: + logger.info(f"[JOIN] Received acknowledgement for {self.agent_id}") + self._acknowledged.set() + except Exception as e: + logger.error(f"[JOIN] Error handling ack: {e}") + + if hasattr(self.mqtt_client, 'add_message_handler'): + self.mqtt_client.add_message_handler(topic, handle_ack) + + from ..mqtt.types import MQTTQoS + await self.mqtt_client.subscribe(topic, MQTTQoS.AT_LEAST_ONCE) + + async def _subscribe_for_ready(self, topic: str): + """Subscribe to all_ready topic.""" + + def handle_ready(message): + try: + payload = json.loads(message.payload_str) + if payload.get("state") == "ready": + self._other_agents = [ + a for a in payload.get("agents", []) + if a != self.agent_id + ] + logger.info(f"[JOIN] All agents ready signal received") + self._all_ready.set() + except Exception as e: + logger.error(f"[JOIN] Error handling ready: {e}") + + if hasattr(self.mqtt_client, 'add_message_handler'): + self.mqtt_client.add_message_handler(topic, handle_ready) + + from ..mqtt.types import MQTTQoS + await self.mqtt_client.subscribe(topic, MQTTQoS.AT_LEAST_ONCE) + + async def _publish_join_request(self): + """Publish join request to host.""" + topic = HandshakeProtocol.join_topic(self.session_id) + payload = json.dumps({ + "agent_id": self.agent_id, + "session_id": self.session_id, + "capabilities": self.capabilities, + "metadata": self.metadata, + "timestamp": datetime.now().isoformat(), + }) + + from ..mqtt.types import MQTTQoS + await self.mqtt_client.publish( + topic=topic, + payload=payload, + qos=MQTTQoS.AT_LEAST_ONCE, + ) + + async def _wait_for_ready(self): + """Wait for both acknowledgement and all_ready signal.""" + await self._acknowledged.wait() + await self._all_ready.wait() diff --git a/src/mcmqtt/mcp/server.py b/src/mcmqtt/mcp/server.py index 0823056..0c966da 100644 --- a/src/mcmqtt/mcp/server.py +++ b/src/mcmqtt/mcp/server.py @@ -15,6 +15,7 @@ from ..mqtt import MQTTClient, MQTTConfig, MQTTPublisher, MQTTSubscriber from ..mqtt.types import MQTTConnectionState, MQTTQoS from ..broker import BrokerManager, BrokerConfig from ..middleware import MQTTBrokerMiddleware +from ..coordination import ConversationHost, ConversationJoiner, ConversationState logger = logging.getLogger(__name__) @@ -51,6 +52,9 @@ class MCMQTTServer(MCPMixin): self._connection_state = MQTTConnectionState.DISCONNECTED self._last_error: Optional[str] = None self._message_store: List[Dict[str, Any]] = [] + + # Conversation coordination state + self._active_conversations: Dict[str, Any] = {} # session_id -> ConversationSession # Register all MCP components self.register_all(self.mcp) @@ -604,7 +608,305 @@ class MCMQTTServer(MCPMixin): "message": f"Error stopping brokers: {str(e)}", "stopped_count": 0 } - + + # Agent-to-Agent Coordination Tools + # These tools solve the "ships passing in the night" problem where agents + # publish messages before other agents have subscribed, causing dropped messages. + + @mcp_tool( + name="mqtt_host_conversation", + description="""Host a coordinated agent-to-agent conversation. The initiating agent +MUST use this tool to ensure other agents are ready before publishing. + +This tool: +1. Spawns an embedded broker (or uses existing one) +2. Publishes broker connection info +3. Waits for ALL expected agents to join and subscribe +4. Returns only when conversation is ready + +CRITICAL: Always use this instead of mqtt_connect when coordinating with other agents. +This prevents 'dropped messages' where you publish before others subscribe.""" + ) + async def host_conversation( + self, + session_id: str, + host_agent_id: str, + expected_agents: List[str], + broker_host: str = "127.0.0.1", + broker_port: int = 0, + timeout_seconds: float = 30.0 + ) -> Dict[str, Any]: + """ + Host a coordinated conversation - initiating agent's role. + + Args: + session_id: Unique identifier for this conversation + host_agent_id: Your agent ID (the host) + expected_agents: List of agent IDs that must join before starting + broker_host: Host to bind broker to (default: 127.0.0.1) + broker_port: Port for broker (0 = auto-assign) + timeout_seconds: Max time to wait for agents to join + + Returns: + Dict with broker connection info and conversation state + """ + try: + # Ensure we have a client (may need to initialize if not done) + if not self.mqtt_client: + # Initialize with default config for hosting + config = MQTTConfig( + broker_host=broker_host, + broker_port=broker_port if broker_port > 0 else 1883, + client_id=f"host-{host_agent_id}" + ) + await self.initialize_mqtt_client(config) + + # Create the conversation host + host = ConversationHost( + mqtt_client=self.mqtt_client, + broker_manager=self.broker_manager, + session_id=session_id, + host_agent_id=host_agent_id, + expected_agents=expected_agents, + broker_host=broker_host, + broker_port=broker_port, + ) + + # Start hosting and wait for agents + logger.info( + f"[HOST] Starting conversation {session_id}, " + f"waiting for agents: {expected_agents}" + ) + + session = await host.start_and_wait(timeout=timeout_seconds) + + # Store active conversation + self._active_conversations[session_id] = session + + # Build response based on state + if session.state == ConversationState.READY: + return { + "success": True, + "message": f"Conversation ready! All {len(expected_agents)} agents joined.", + "session_id": session_id, + "state": session.state.value, + "broker_host": session.broker_host, + "broker_port": session.broker_port, + "broker_url": f"mqtt://{session.broker_host}:{session.broker_port}", + "joined_agents": list(session.joined_agents.keys()), + "conversation_topic": f"conversation/{session_id}/main", + "ready_to_publish": True + } + elif session.state == ConversationState.TIMEOUT: + return { + "success": False, + "message": f"Timeout waiting for agents. Missing: {session.missing_agents}", + "session_id": session_id, + "state": session.state.value, + "broker_host": session.broker_host, + "broker_port": session.broker_port, + "joined_agents": list(session.joined_agents.keys()), + "missing_agents": session.missing_agents, + "ready_to_publish": False + } + else: + return { + "success": False, + "message": f"Conversation in unexpected state: {session.state.value}", + "session_id": session_id, + "state": session.state.value, + "ready_to_publish": False + } + + except Exception as e: + logger.error(f"Error hosting conversation: {e}") + return { + "success": False, + "message": f"Error hosting conversation: {str(e)}", + "session_id": session_id, + "ready_to_publish": False + } + + @mcp_tool( + name="mqtt_join_conversation", + description="""Join a coordinated agent-to-agent conversation hosted by another agent. + +This tool: +1. Connects to the broker at the specified host/port +2. Subscribes to coordination topics +3. Signals to the host that you're ready +4. Waits for acknowledgement and 'all ready' signal + +CRITICAL: Always use this instead of mqtt_connect when joining a conversation +started by another agent. This ensures you're subscribed BEFORE messages flow.""" + ) + async def join_conversation( + self, + session_id: str, + agent_id: str, + broker_host: str, + broker_port: int, + capabilities: Optional[List[str]] = None, + timeout_seconds: float = 30.0 + ) -> Dict[str, Any]: + """ + Join a coordinated conversation - invited agent's role. + + Args: + session_id: Session ID provided by the hosting agent + agent_id: Your unique agent ID + broker_host: Broker host (from host's invitation) + broker_port: Broker port (from host's invitation) + capabilities: Optional list of your capabilities + timeout_seconds: Max time to wait for acknowledgement + + Returns: + Dict with join status and conversation info + """ + try: + # First connect to the broker + config = MQTTConfig( + broker_host=broker_host, + broker_port=broker_port, + client_id=f"joiner-{agent_id}" + ) + + success = await self.initialize_mqtt_client(config) + if not success: + return { + "success": False, + "message": f"Failed to initialize MQTT client: {self._last_error}", + "session_id": session_id, + "agent_id": agent_id + } + + connect_success = await self.connect_mqtt() + if not connect_success: + return { + "success": False, + "message": f"Failed to connect to broker: {self._last_error}", + "session_id": session_id, + "agent_id": agent_id + } + + # Create the conversation joiner + joiner = ConversationJoiner( + mqtt_client=self.mqtt_client, + session_id=session_id, + agent_id=agent_id, + broker_host=broker_host, + broker_port=broker_port, + capabilities=capabilities or [], + ) + + logger.info( + f"[JOIN] Agent {agent_id} joining conversation {session_id}" + ) + + # Join and wait for acknowledgement + result = await joiner.join_and_wait(timeout=timeout_seconds) + + if result["success"]: + return { + "success": True, + "message": f"Successfully joined conversation {session_id}!", + "session_id": session_id, + "agent_id": agent_id, + "broker_host": broker_host, + "broker_port": broker_port, + "other_agents": result.get("other_agents", []), + "conversation_topic": result.get("conversation_topic"), + "ready_to_receive": True + } + else: + return { + "success": False, + "message": result.get("message", "Failed to join conversation"), + "error": result.get("error"), + "session_id": session_id, + "agent_id": agent_id, + "ready_to_receive": False + } + + except Exception as e: + logger.error(f"Error joining conversation: {e}") + return { + "success": False, + "message": f"Error joining conversation: {str(e)}", + "session_id": session_id, + "agent_id": agent_id, + "ready_to_receive": False + } + + @mcp_tool( + name="mqtt_conversation_status", + description="Get status of an active coordinated conversation" + ) + async def get_conversation_status(self, session_id: str) -> Dict[str, Any]: + """Get the current status of a conversation session.""" + try: + if session_id not in self._active_conversations: + return { + "success": False, + "message": f"No active conversation with session_id: {session_id}", + "session_id": session_id + } + + session = self._active_conversations[session_id] + + return { + "success": True, + "session_id": session_id, + "state": session.state.value, + "host_agent_id": session.host_agent_id, + "broker_host": session.broker_host, + "broker_port": session.broker_port, + "expected_agents": session.expected_agents, + "joined_agents": list(session.joined_agents.keys()), + "missing_agents": session.missing_agents, + "all_agents_joined": session.all_agents_joined, + "created_at": session.created_at.isoformat() + } + + except Exception as e: + return { + "success": False, + "message": f"Error getting conversation status: {str(e)}", + "session_id": session_id + } + + @mcp_tool( + name="mqtt_list_conversations", + description="List all active coordinated conversations" + ) + async def list_conversations(self) -> Dict[str, Any]: + """List all active conversation sessions.""" + try: + conversations = [] + for session_id, session in self._active_conversations.items(): + conversations.append({ + "session_id": session_id, + "state": session.state.value, + "host_agent_id": session.host_agent_id, + "broker_url": f"mqtt://{session.broker_host}:{session.broker_port}", + "agent_count": len(session.joined_agents), + "expected_count": len(session.expected_agents), + "all_ready": session.all_agents_joined + }) + + return { + "success": True, + "conversations": conversations, + "total_count": len(conversations) + } + + except Exception as e: + return { + "success": False, + "message": f"Error listing conversations: {str(e)}", + "conversations": [] + } + # MCP Resources using MCPMixin pattern @mcp_resource(uri="mqtt://config") async def get_config_resource(self) -> Dict[str, Any]: