Add agent-to-agent coordination protocol with host/join handshake

Solves the "ships passing in the night" problem where agents publish
messages before other agents have subscribed, causing dropped messages.

New tools:
- mqtt_host_conversation: Initiating agent hosts and waits for joiners
- mqtt_join_conversation: Joining agents connect and signal ready
- mqtt_conversation_status: Check conversation state
- mqtt_list_conversations: List active conversations

The protocol guarantees no messages are dropped by ensuring all expected
agents are subscribed before the host begins publishing.
This commit is contained in:
Ryan Malloy 2026-02-07 04:40:30 -07:00
parent b95536c388
commit 33189816f2
6 changed files with 1408 additions and 3 deletions

View File

@ -130,14 +130,88 @@ cd examples/fractal-agent-coordination/
### 🏃‍♂️ FastMCP MQTT Tools ### 🏃‍♂️ FastMCP MQTT Tools
**Core MQTT Tools:**
- `mqtt_connect` - Connect to MQTT brokers - `mqtt_connect` - Connect to MQTT brokers
- `mqtt_publish` - Publish messages with QoS support - `mqtt_publish` - Publish messages with QoS support
- `mqtt_subscribe` - Subscribe to topics with wildcards - `mqtt_subscribe` - Subscribe to topics with wildcards
- `mqtt_get_messages` - Retrieve received messages - `mqtt_get_messages` - Retrieve received messages
- `mqtt_status` - Get connection and statistics - `mqtt_status` - Get connection and statistics
- `mqtt_spawn_broker` - Create embedded brokers instantly - `mqtt_spawn_broker` - Create embedded brokers instantly
- `mqtt_list_brokers` - Manage multiple brokers - `mqtt_list_brokers` - Manage multiple brokers
**Agent Coordination Tools (NEW!):**
- `mqtt_host_conversation` - **Host** a coordinated agent-to-agent conversation
- `mqtt_join_conversation` - **Join** a conversation hosted by another agent
- `mqtt_conversation_status` - Get status of active conversations
- `mqtt_list_conversations` - List all active conversations
### 🤝 Agent-to-Agent Coordination Protocol
**Solving the "Ships Passing in the Night" Problem**
When two agents try to communicate via MQTT, a common issue occurs: Agent A publishes a message before Agent B has subscribed, resulting in dropped messages. mcmqtt's coordination protocol solves this with a **host/join handshake**:
```
┌─────────────────┐ ┌─────────────────┐
│ Agent A │ │ Agent B │
│ (Initiator) │ │ (Invited) │
└────────┬────────┘ └────────┬────────┘
│ │
│ 1. mqtt_host_conversation() │
│ - Spawns broker │
│ - Waits for Agent B │
│ ────────────────────► │
│ │
│ 2. mqtt_join_conversation()
│ - Connects to broker
│ - Signals ready
│ ◄────────────────────
│ │
│ 3. Host acknowledges join │
│ ────────────────────► │
│ │
│ 4. All ready - safe to publish! │
│ ◄───────────────────► │
└───────────────────────────────────────┘
```
**Usage Example:**
```python
# Agent A (initiating the conversation):
result = await mqtt_host_conversation(
session_id="task-collaboration-123",
host_agent_id="coordinator",
expected_agents=["worker-1", "worker-2"],
timeout_seconds=30
)
# Returns only when ALL expected agents have joined and subscribed!
if result["ready_to_publish"]:
await mqtt_publish(topic=result["conversation_topic"], payload="Hello team!")
```
```python
# Agent B (joining the conversation):
result = await mqtt_join_conversation(
session_id="task-collaboration-123",
agent_id="worker-1",
broker_host="127.0.0.1",
broker_port=1883
)
# Returns only after host acknowledges and all agents are ready!
if result["ready_to_receive"]:
# Now safe to receive messages - guaranteed not to miss any!
messages = await mqtt_get_messages(topic="conversation/task-collaboration-123/main")
```
**Key Guarantees:**
- **No dropped messages** - Host waits for all agents to subscribe before publishing
- **Clear ownership** - Initiating agent always hosts the broker
- **Timeout protection** - Configurable timeout prevents infinite waits
- **State visibility** - Check conversation status at any time
### 🔧 Embedded Broker Management ### 🔧 Embedded Broker Management
**MCP clients can spawn MQTT brokers on-demand using the `mqtt_spawn_broker` tool:** **MCP clients can spawn MQTT brokers on-demand using the `mqtt_spawn_broker` tool:**
@ -227,12 +301,13 @@ This isn't your typical monolithic MQTT library. mcmqtt features a **clean modul
``` ```
mcmqtt/ mcmqtt/
├── cli/ # Command-line interface & argument parsing ├── cli/ # Command-line interface & argument parsing
├── config/ # Environment & configuration management ├── config/ # Environment & configuration management
├── logging/ # Structured logging setup ├── logging/ # Structured logging setup
├── server/ # STDIO & HTTP server runners ├── server/ # STDIO & HTTP server runners
├── mqtt/ # Core MQTT client functionality ├── mqtt/ # Core MQTT client functionality
├── mcp/ # FastMCP server integration ├── mcp/ # FastMCP server integration
├── broker/ # Embedded broker management ├── broker/ # Embedded broker management
├── coordination/ # Agent-to-agent handshake protocol (NEW!)
└── middleware/ # Broker middleware & orchestration └── middleware/ # Broker middleware & orchestration
``` ```

210
docs-site/astro.config.mjs Normal file
View File

@ -0,0 +1,210 @@
import { defineConfig } from 'astro/config';
import starlight from '@astrojs/starlight';
import alpinejs from '@astrojs/alpinejs';
export default defineConfig({
site: 'https://mcmqtt.dev',
base: '/',
trailingSlash: 'ignore',
build: {
format: 'directory',
},
integrations: [
alpinejs(),
starlight({
title: 'mcmqtt - AI Coordination Platform',
description: 'The definitive platform for AI coordination with revolutionary fractal agent swarms. Deploy sophisticated multi-agent systems with zero-config infrastructure and production-grade safety.',
logo: {
src: './src/assets/mcmqtt-logo.svg',
replacesTitle: true,
},
social: {
github: 'https://git.supported.systems/MCP/mcmqtt',
discord: 'https://discord.gg/mcmqtt',
twitter: 'https://twitter.com/mcmqtt',
},
editLink: {
baseUrl: 'https://git.supported.systems/MCP/mcmqtt/edit/main/docs-site/',
},
lastUpdated: true,
customCss: [
'./src/styles/custom.css',
],
components: {
Head: './src/components/Head.astro',
Hero: './src/components/Hero.astro',
},
favicon: '/favicon.svg',
sidebar: [
{
label: 'Start Here',
items: [
{ label: 'Quick Start', link: '/guides/quick-start/' },
{ label: 'Installation', link: '/guides/installation/' },
{ label: 'Configuration', link: '/guides/configuration/' },
],
},
{
label: 'Fractal Agent Coordination',
badge: 'New',
items: [
{ label: 'Overview', link: '/coordination/overview/' },
{ label: 'Agent Handshake Protocol', link: '/coordination/agent-handshake/', badge: 'New' },
{ label: 'Agent Swarms', link: '/coordination/swarms/' },
{ label: 'Browser Testing', link: '/coordination/browser-testing/' },
{ label: 'API Monitoring', link: '/coordination/api-monitoring/' },
{ label: 'Security Analysis', link: '/coordination/security/' },
{ label: 'Safety & Isolation', link: '/coordination/safety/' },
],
},
{
label: 'Tutorials',
items: [
{ label: 'Your First MQTT Connection', link: '/tutorials/first-connection/' },
{ label: 'Building Agent Networks', link: '/tutorials/agent-networks/' },
{ label: 'Real-time Data Streams', link: '/tutorials/data-streams/' },
{ label: 'Production Deployment', link: '/tutorials/production/' },
],
},
{
label: 'How-to Guides',
items: [
{ label: 'Spawn Dynamic Brokers', link: '/how-to/dynamic-brokers/' },
{ label: 'Implement Custom Tools', link: '/how-to/custom-tools/' },
{ label: 'Monitor Agent Health', link: '/how-to/monitoring/' },
{ label: 'Scale Horizontally', link: '/how-to/scaling/' },
{ label: 'Debug Connections', link: '/how-to/debugging/' },
],
},
{
label: 'Core Features',
items: [
{ label: 'MQTT Integration', link: '/features/mqtt/' },
{ label: 'FastMCP Server', link: '/features/fastmcp/' },
{ label: 'Embedded Brokers', link: '/features/brokers/' },
{ label: 'Real-time Messaging', link: '/features/messaging/' },
],
},
{
label: 'API Reference',
items: [
{ label: 'MCP Tools', link: '/reference/mcp-tools/' },
{ label: 'MQTT Methods', link: '/reference/mqtt-methods/' },
{ label: 'Broker Management', link: '/reference/broker-management/' },
{ label: 'Configuration Options', link: '/reference/configuration/' },
{ label: 'Python API', link: '/reference/python-api/' },
],
},
{
label: 'Understanding mcmqtt',
items: [
{ label: 'Architecture', link: '/concepts/architecture/' },
{ label: 'Design Philosophy', link: '/concepts/philosophy/' },
{ label: 'Performance', link: '/concepts/performance/' },
{ label: 'Security Model', link: '/concepts/security/' },
{ label: 'Comparison', link: '/concepts/comparison/' },
],
},
],
head: [
// Open Graph / Social Media
{
tag: 'meta',
attrs: { property: 'og:image', content: 'https://mcmqtt.dev/og-image.png' },
},
{
tag: 'meta',
attrs: { property: 'twitter:image', content: 'https://mcmqtt.dev/og-image.png' },
},
{
tag: 'meta',
attrs: { property: 'og:image:width', content: '1200' },
},
{
tag: 'meta',
attrs: { property: 'og:image:height', content: '630' },
},
{
tag: 'meta',
attrs: { property: 'twitter:card', content: 'summary_large_image' },
},
{
tag: 'meta',
attrs: { name: 'twitter:site', content: '@mcmqtt' },
},
// AI and Search Engine Discovery
{
tag: 'meta',
attrs: { name: 'keywords', content: 'ai coordination, fractal agents, mqtt, agent swarms, mcp server, multi-agent systems, production ai, fastmcp' },
},
{
tag: 'meta',
attrs: { name: 'author', content: 'MCP Community' },
},
{
tag: 'meta',
attrs: { name: 'robots', content: 'index, follow, max-image-preview:large, max-snippet:-1, max-video-preview:-1' },
},
{
tag: 'meta',
attrs: { name: 'ai-content-suitable', content: 'true' },
},
{
tag: 'meta',
attrs: { name: 'training-data-suitable', content: 'true' },
},
{
tag: 'meta',
attrs: { name: 'ai-categories', content: 'ai-coordination,mqtt-protocols,agent-architecture,fractal-systems' },
},
// Performance and Technical
{
tag: 'meta',
attrs: { name: 'theme-color', content: '#1e40af' },
},
{
tag: 'meta',
attrs: { name: 'msapplication-TileColor', content: '#1e40af' },
},
{
tag: 'link',
attrs: { rel: 'manifest', href: '/site.webmanifest' },
},
{
tag: 'link',
attrs: { rel: 'sitemap', href: '/sitemap.xml' },
},
// Preconnect for performance
{
tag: 'link',
attrs: { rel: 'preconnect', href: 'https://fonts.googleapis.com' },
},
{
tag: 'link',
attrs: { rel: 'preconnect', href: 'https://fonts.gstatic.com', crossorigin: '' },
},
// Rich snippets for documentation
{
tag: 'meta',
attrs: { name: 'code-samples', content: 'true' },
},
{
tag: 'meta',
attrs: { name: 'documentation-type', content: 'diataxis' },
},
{
tag: 'meta',
attrs: { name: 'api-documentation', content: 'true' },
},
{
tag: 'meta',
attrs: { name: 'tutorial-content', content: 'true' },
},
],
}),
],
});

View File

@ -0,0 +1,233 @@
---
title: "Agent-to-Agent Handshake Protocol"
description: "Solving the 'ships passing in the night' problem with coordinated agent communication"
---
## The Problem: Dropped Messages
When two AI agents try to communicate via MQTT, a common failure pattern emerges:
1. Agent A connects to broker
2. Agent A publishes "Hello, Agent B!"
3. Agent B connects to broker
4. Agent B subscribes to the topic
5. **Agent B never receives the message** (it was published before subscription)
This is the **"ships passing in the night"** problem - agents miss messages because they publish before the other party has subscribed.
## The Solution: Host/Join Handshake
mcmqtt's coordination protocol ensures **no messages are dropped** by implementing a simple but powerful handshake:
```
HOST JOINER
│ │
│ 1. Spawn/connect to broker │
│ 2. Subscribe to $coord/join │
│ 3. Publish broker_ready │
│─────────────────────────────────►│
│ │
│ 4. Connect to broker │
│ 5. Subscribe to topics │
│ 6. Publish join request │
│◄─────────────────────────────────│
│ │
│ 7. Acknowledge join │
│─────────────────────────────────►│
│ │
│ 8. Publish all_ready signal │
│─────────────────────────────────►│
│ │
▼ SAFE TO EXCHANGE MESSAGES! ▼
```
## Key Principle
**The initiating agent ALWAYS hosts.** This eliminates confusion about who spawns the broker.
## MCP Tools
### mqtt_host_conversation
Use this when **you** are starting a conversation with other agents.
```json
{
"tool": "mqtt_host_conversation",
"arguments": {
"session_id": "collab-task-123",
"host_agent_id": "coordinator-agent",
"expected_agents": ["worker-1", "worker-2", "analyst"],
"broker_host": "127.0.0.1",
"broker_port": 0,
"timeout_seconds": 30
}
}
```
**Parameters:**
| Parameter | Type | Description |
|-----------|------|-------------|
| `session_id` | string | Unique identifier for this conversation |
| `host_agent_id` | string | Your agent's unique ID |
| `expected_agents` | array | List of agent IDs that must join |
| `broker_host` | string | Host to bind broker (default: 127.0.0.1) |
| `broker_port` | int | Port (0 = auto-assign) |
| `timeout_seconds` | float | Max wait time for agents to join |
**Response (success):**
```json
{
"success": true,
"message": "Conversation ready! All 3 agents joined.",
"session_id": "collab-task-123",
"state": "ready",
"broker_host": "127.0.0.1",
"broker_port": 51234,
"broker_url": "mqtt://127.0.0.1:51234",
"joined_agents": ["worker-1", "worker-2", "analyst"],
"conversation_topic": "conversation/collab-task-123/main",
"ready_to_publish": true
}
```
### mqtt_join_conversation
Use this when **another agent** invited you to a conversation.
```json
{
"tool": "mqtt_join_conversation",
"arguments": {
"session_id": "collab-task-123",
"agent_id": "worker-1",
"broker_host": "127.0.0.1",
"broker_port": 51234,
"capabilities": ["data-analysis", "visualization"],
"timeout_seconds": 30
}
}
```
**Parameters:**
| Parameter | Type | Description |
|-----------|------|-------------|
| `session_id` | string | Session ID from host's invitation |
| `agent_id` | string | Your unique agent ID |
| `broker_host` | string | Broker host (from host's invitation) |
| `broker_port` | int | Broker port (from host's invitation) |
| `capabilities` | array | Optional list of your capabilities |
| `timeout_seconds` | float | Max wait for acknowledgement |
**Response (success):**
```json
{
"success": true,
"message": "Successfully joined conversation collab-task-123!",
"session_id": "collab-task-123",
"agent_id": "worker-1",
"broker_host": "127.0.0.1",
"broker_port": 51234,
"other_agents": ["coordinator-agent", "worker-2", "analyst"],
"conversation_topic": "conversation/collab-task-123/main",
"ready_to_receive": true
}
```
## Example: Two-Agent Collaboration
### Agent A (Initiator/Host)
```python
# Step 1: Host the conversation
result = await mqtt_host_conversation(
session_id="data-analysis-job",
host_agent_id="data-processor",
expected_agents=["visualizer"],
timeout_seconds=30
)
if result["ready_to_publish"]:
# Step 2: Safe to publish - visualizer is definitely subscribed!
await mqtt_publish(
topic=result["conversation_topic"],
payload={"type": "data", "values": [1, 2, 3, 4, 5]}
)
```
### Agent B (Joiner)
```python
# Step 1: Join using info from Agent A
result = await mqtt_join_conversation(
session_id="data-analysis-job",
agent_id="visualizer",
broker_host="127.0.0.1",
broker_port=51234
)
if result["ready_to_receive"]:
# Step 2: Now receive messages - guaranteed not to miss any!
messages = await mqtt_get_messages(
topic=result["conversation_topic"]
)
```
## Topic Structure
The protocol uses reserved topics under `$coordination/`:
```
$coordination/{session_id}/
├── broker_ready # Host publishes broker info (retained)
├── join # Agents publish join requests
├── joined/{agent_id} # Host acknowledges each agent (retained)
├── ready # Host signals all agents ready (retained)
└── heartbeat/{agent_id} # Optional: agent heartbeats
```
After handshake, conversations use:
```
conversation/{session_id}/
├── main # Primary conversation channel
├── {channel_name} # Additional named channels
└── ...
```
## Timeout Handling
If expected agents don't join within the timeout:
```json
{
"success": false,
"message": "Timeout waiting for agents. Missing: ['worker-2']",
"session_id": "collab-task-123",
"state": "timeout",
"joined_agents": ["worker-1", "analyst"],
"missing_agents": ["worker-2"],
"ready_to_publish": false
}
```
The host can then decide whether to:
- Retry with a longer timeout
- Proceed with available agents
- Abort the conversation
## Best Practices
1. **Always use coordination tools for multi-agent work** - Don't use raw `mqtt_connect` + `mqtt_publish` when coordinating with other agents
2. **Choose meaningful session IDs** - Include context like `task-{id}-{timestamp}` for debugging
3. **Set appropriate timeouts** - Network latency and agent startup time vary; 30 seconds is a safe default
4. **Check the response** - Always verify `ready_to_publish` (host) or `ready_to_receive` (joiner) before proceeding
5. **Handle failures gracefully** - Timeout doesn't mean failure; retry logic is your friend

View File

@ -0,0 +1,43 @@
"""Agent-to-agent coordination module for mcmqtt.
This module implements the host/join handshake protocol that ensures
agents don't experience "ships passing in the night" - dropped messages
because one agent published before the other subscribed.
The pattern is simple:
1. Initiating agent ALWAYS hosts (spawns broker, waits for joiners)
2. Joining agents connect and signal readiness
3. Only after all expected agents join does message exchange begin
Example:
# Agent A (initiator/host):
result = await mqtt_host_conversation(
session_id="collab-123",
expected_agents=["agent-b", "agent-c"],
timeout_seconds=30
)
# Returns broker connection info and confirms all agents joined
# Agent B (joiner):
result = await mqtt_join_conversation(
session_id="collab-123",
agent_id="agent-b",
broker_host="localhost",
broker_port=1883
)
# Returns when successfully joined and host acknowledged
"""
from .protocol import (
ConversationHost,
ConversationJoiner,
HandshakeProtocol,
ConversationState,
)
__all__ = [
"ConversationHost",
"ConversationJoiner",
"HandshakeProtocol",
"ConversationState",
]

View File

@ -0,0 +1,542 @@
"""Handshake protocol for agent-to-agent coordination.
This module solves the "ships passing in the night" problem where agents
publish messages before other agents have subscribed, resulting in dropped
messages.
The solution is a simple handshake:
1. HOST spawns/owns the broker and waits for joiners
2. JOINERs connect and signal they're ready
3. HOST acknowledges all joiners before conversation begins
"""
import asyncio
import json
import logging
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
from typing import Dict, List, Optional, Set, Any, Callable
logger = logging.getLogger(__name__)
class ConversationState(Enum):
"""State of a coordinated conversation."""
INITIALIZING = "initializing" # Host is setting up
WAITING_FOR_AGENTS = "waiting" # Host waiting for agents to join
READY = "ready" # All agents joined, conversation active
CLOSED = "closed" # Conversation ended
TIMEOUT = "timeout" # Timed out waiting for agents
ERROR = "error" # Error occurred
@dataclass
class AgentInfo:
"""Information about a joined agent."""
agent_id: str
joined_at: datetime
capabilities: List[str] = field(default_factory=list)
metadata: Dict[str, Any] = field(default_factory=dict)
@dataclass
class ConversationSession:
"""Represents an active conversation session."""
session_id: str
host_agent_id: str
state: ConversationState
broker_host: str
broker_port: int
created_at: datetime
expected_agents: List[str]
joined_agents: Dict[str, AgentInfo] = field(default_factory=dict)
topics: Dict[str, str] = field(default_factory=dict) # name -> full topic
@property
def all_agents_joined(self) -> bool:
"""Check if all expected agents have joined."""
return all(
agent_id in self.joined_agents
for agent_id in self.expected_agents
)
@property
def missing_agents(self) -> List[str]:
"""Get list of agents that haven't joined yet."""
return [
agent_id for agent_id in self.expected_agents
if agent_id not in self.joined_agents
]
class HandshakeProtocol:
"""
Defines the MQTT topic structure for coordination handshakes.
Topic Structure:
$coordination/{session_id}/
broker_ready # Host publishes broker info here
join # Agents publish join requests here
joined/{agent_id} # Host acknowledges each agent
ready # Host signals all agents ready
heartbeat/{agent_id} # Agents send periodic heartbeats
"""
PREFIX = "$coordination"
@classmethod
def broker_ready_topic(cls, session_id: str) -> str:
"""Topic where host publishes broker ready signal."""
return f"{cls.PREFIX}/{session_id}/broker_ready"
@classmethod
def join_topic(cls, session_id: str) -> str:
"""Topic where agents publish join requests."""
return f"{cls.PREFIX}/{session_id}/join"
@classmethod
def joined_topic(cls, session_id: str, agent_id: str) -> str:
"""Topic where host acknowledges a specific agent."""
return f"{cls.PREFIX}/{session_id}/joined/{agent_id}"
@classmethod
def all_ready_topic(cls, session_id: str) -> str:
"""Topic where host signals all agents are ready."""
return f"{cls.PREFIX}/{session_id}/ready"
@classmethod
def heartbeat_topic(cls, session_id: str, agent_id: str) -> str:
"""Topic for agent heartbeats."""
return f"{cls.PREFIX}/{session_id}/heartbeat/{agent_id}"
@classmethod
def conversation_topic(cls, session_id: str, channel: str = "main") -> str:
"""Topic for actual conversation messages (after handshake)."""
return f"conversation/{session_id}/{channel}"
class ConversationHost:
"""
Manages hosting a conversation - the initiating agent's side.
The host:
1. Spawns or uses an existing broker
2. Publishes broker_ready message
3. Subscribes to join topic and waits for expected agents
4. Acknowledges each joining agent
5. Signals all_ready when everyone has joined
Usage:
host = ConversationHost(
mqtt_client=client,
broker_manager=manager,
session_id="collab-123",
host_agent_id="coordinator",
expected_agents=["worker-1", "worker-2"]
)
session = await host.start_and_wait(timeout=30)
if session.state == ConversationState.READY:
# All agents joined - safe to publish messages
...
"""
def __init__(
self,
mqtt_client: Any,
broker_manager: Any,
session_id: str,
host_agent_id: str,
expected_agents: List[str],
broker_host: str = "127.0.0.1",
broker_port: int = 0, # 0 = auto-assign
):
self.mqtt_client = mqtt_client
self.broker_manager = broker_manager
self.session_id = session_id
self.host_agent_id = host_agent_id
self.expected_agents = expected_agents
self.broker_host = broker_host
self.broker_port = broker_port
self._session: Optional[ConversationSession] = None
self._join_event = asyncio.Event()
self._joined_agents: Dict[str, AgentInfo] = {}
async def start_and_wait(self, timeout: float = 30.0) -> ConversationSession:
"""
Start hosting and wait for all expected agents to join.
Args:
timeout: Maximum seconds to wait for all agents
Returns:
ConversationSession with state indicating success/failure
"""
try:
# Step 1: Spawn broker if needed
actual_port = await self._ensure_broker()
# Create session
self._session = ConversationSession(
session_id=self.session_id,
host_agent_id=self.host_agent_id,
state=ConversationState.INITIALIZING,
broker_host=self.broker_host,
broker_port=actual_port,
created_at=datetime.now(),
expected_agents=self.expected_agents,
)
# Step 2: Subscribe to join topic
join_topic = HandshakeProtocol.join_topic(self.session_id)
await self._subscribe_for_joins(join_topic)
# Step 3: Publish broker_ready
await self._publish_broker_ready()
self._session.state = ConversationState.WAITING_FOR_AGENTS
logger.info(
f"[HOST] Session {self.session_id} waiting for agents: "
f"{self.expected_agents}"
)
# Step 4: Wait for all agents with timeout
try:
await asyncio.wait_for(
self._wait_for_all_agents(),
timeout=timeout
)
# All agents joined!
self._session.state = ConversationState.READY
self._session.joined_agents = self._joined_agents.copy()
# Signal all ready
await self._publish_all_ready()
logger.info(
f"[HOST] Session {self.session_id} READY with agents: "
f"{list(self._joined_agents.keys())}"
)
except asyncio.TimeoutError:
self._session.state = ConversationState.TIMEOUT
self._session.joined_agents = self._joined_agents.copy()
logger.warning(
f"[HOST] Timeout waiting for agents. "
f"Missing: {self._session.missing_agents}"
)
return self._session
except Exception as e:
if self._session:
self._session.state = ConversationState.ERROR
logger.error(f"[HOST] Error starting conversation: {e}")
raise
async def _ensure_broker(self) -> int:
"""Ensure broker is running, spawn if needed. Returns actual port."""
if self.broker_port > 0:
# Using existing broker at specified port
return self.broker_port
# Spawn new broker
from ..broker import BrokerConfig
config = BrokerConfig(
port=0, # Auto-assign
host=self.broker_host,
name=f"conversation-{self.session_id}",
)
broker_id = await self.broker_manager.spawn_broker(config)
broker_info = await self.broker_manager.get_broker_status(broker_id)
actual_port = broker_info.config.port
logger.info(f"[HOST] Spawned broker on port {actual_port}")
return actual_port
async def _subscribe_for_joins(self, topic: str):
"""Subscribe to join topic and handle incoming join requests."""
def handle_join(message):
"""Handle join request from an agent."""
try:
payload = json.loads(message.payload_str)
agent_id = payload.get("agent_id")
if agent_id and agent_id in self.expected_agents:
self._joined_agents[agent_id] = AgentInfo(
agent_id=agent_id,
joined_at=datetime.now(),
capabilities=payload.get("capabilities", []),
metadata=payload.get("metadata", {}),
)
logger.info(f"[HOST] Agent joined: {agent_id}")
# Acknowledge the join
asyncio.create_task(self._acknowledge_agent(agent_id))
# Check if all agents have joined
if len(self._joined_agents) >= len(self.expected_agents):
self._join_event.set()
except Exception as e:
logger.error(f"[HOST] Error handling join: {e}")
# Register handler and subscribe
if hasattr(self.mqtt_client, 'add_message_handler'):
self.mqtt_client.add_message_handler(topic, handle_join)
from ..mqtt.types import MQTTQoS
await self.mqtt_client.subscribe(topic, MQTTQoS.AT_LEAST_ONCE)
async def _publish_broker_ready(self):
"""Publish broker ready signal with connection info."""
topic = HandshakeProtocol.broker_ready_topic(self.session_id)
payload = json.dumps({
"session_id": self.session_id,
"host_agent_id": self.host_agent_id,
"broker_host": self.broker_host,
"broker_port": self._session.broker_port,
"expected_agents": self.expected_agents,
"timestamp": datetime.now().isoformat(),
})
from ..mqtt.types import MQTTQoS
await self.mqtt_client.publish(
topic=topic,
payload=payload,
qos=MQTTQoS.AT_LEAST_ONCE,
retain=True # Retain so late joiners see it
)
async def _acknowledge_agent(self, agent_id: str):
"""Acknowledge a specific agent's join."""
topic = HandshakeProtocol.joined_topic(self.session_id, agent_id)
payload = json.dumps({
"session_id": self.session_id,
"agent_id": agent_id,
"acknowledged": True,
"timestamp": datetime.now().isoformat(),
})
from ..mqtt.types import MQTTQoS
await self.mqtt_client.publish(
topic=topic,
payload=payload,
qos=MQTTQoS.AT_LEAST_ONCE,
retain=True
)
async def _wait_for_all_agents(self):
"""Wait until all expected agents have joined."""
while len(self._joined_agents) < len(self.expected_agents):
await asyncio.sleep(0.1)
if self._join_event.is_set():
break
async def _publish_all_ready(self):
"""Publish signal that all agents are ready."""
topic = HandshakeProtocol.all_ready_topic(self.session_id)
payload = json.dumps({
"session_id": self.session_id,
"state": "ready",
"agents": list(self._joined_agents.keys()),
"timestamp": datetime.now().isoformat(),
})
from ..mqtt.types import MQTTQoS
await self.mqtt_client.publish(
topic=topic,
payload=payload,
qos=MQTTQoS.AT_LEAST_ONCE,
retain=True
)
class ConversationJoiner:
"""
Manages joining a conversation - the invited agent's side.
The joiner:
1. Connects to the broker (using info from host)
2. Subscribes to acknowledgement topic
3. Publishes join request
4. Waits for host acknowledgement
5. Waits for all_ready signal
Usage:
joiner = ConversationJoiner(
mqtt_client=client,
session_id="collab-123",
agent_id="worker-1",
broker_host="localhost",
broker_port=1883
)
result = await joiner.join_and_wait(timeout=30)
if result["success"]:
# Successfully joined - safe to exchange messages
...
"""
def __init__(
self,
mqtt_client: Any,
session_id: str,
agent_id: str,
broker_host: str,
broker_port: int,
capabilities: Optional[List[str]] = None,
metadata: Optional[Dict[str, Any]] = None,
):
self.mqtt_client = mqtt_client
self.session_id = session_id
self.agent_id = agent_id
self.broker_host = broker_host
self.broker_port = broker_port
self.capabilities = capabilities or []
self.metadata = metadata or {}
self._acknowledged = asyncio.Event()
self._all_ready = asyncio.Event()
self._other_agents: List[str] = []
async def join_and_wait(self, timeout: float = 30.0) -> Dict[str, Any]:
"""
Join the conversation and wait for acknowledgement.
Args:
timeout: Maximum seconds to wait for acknowledgement
Returns:
Dict with success status and session info
"""
try:
# Step 1: Subscribe to our acknowledgement topic
ack_topic = HandshakeProtocol.joined_topic(self.session_id, self.agent_id)
await self._subscribe_for_acknowledgement(ack_topic)
# Step 2: Subscribe to all_ready topic
ready_topic = HandshakeProtocol.all_ready_topic(self.session_id)
await self._subscribe_for_ready(ready_topic)
# Step 3: Publish join request
await self._publish_join_request()
logger.info(
f"[JOIN] Agent {self.agent_id} joining session {self.session_id}"
)
# Step 4: Wait for acknowledgement and all_ready
try:
await asyncio.wait_for(
self._wait_for_ready(),
timeout=timeout
)
logger.info(
f"[JOIN] Agent {self.agent_id} successfully joined! "
f"Other agents: {self._other_agents}"
)
return {
"success": True,
"session_id": self.session_id,
"agent_id": self.agent_id,
"broker_host": self.broker_host,
"broker_port": self.broker_port,
"other_agents": self._other_agents,
"conversation_topic": HandshakeProtocol.conversation_topic(
self.session_id
),
}
except asyncio.TimeoutError:
logger.warning(
f"[JOIN] Timeout waiting for session {self.session_id}"
)
return {
"success": False,
"error": "timeout",
"message": "Timed out waiting for host acknowledgement",
"session_id": self.session_id,
"agent_id": self.agent_id,
}
except Exception as e:
logger.error(f"[JOIN] Error joining conversation: {e}")
return {
"success": False,
"error": "exception",
"message": str(e),
"session_id": self.session_id,
"agent_id": self.agent_id,
}
async def _subscribe_for_acknowledgement(self, topic: str):
"""Subscribe to our acknowledgement topic."""
def handle_ack(message):
try:
payload = json.loads(message.payload_str)
if payload.get("acknowledged") and payload.get("agent_id") == self.agent_id:
logger.info(f"[JOIN] Received acknowledgement for {self.agent_id}")
self._acknowledged.set()
except Exception as e:
logger.error(f"[JOIN] Error handling ack: {e}")
if hasattr(self.mqtt_client, 'add_message_handler'):
self.mqtt_client.add_message_handler(topic, handle_ack)
from ..mqtt.types import MQTTQoS
await self.mqtt_client.subscribe(topic, MQTTQoS.AT_LEAST_ONCE)
async def _subscribe_for_ready(self, topic: str):
"""Subscribe to all_ready topic."""
def handle_ready(message):
try:
payload = json.loads(message.payload_str)
if payload.get("state") == "ready":
self._other_agents = [
a for a in payload.get("agents", [])
if a != self.agent_id
]
logger.info(f"[JOIN] All agents ready signal received")
self._all_ready.set()
except Exception as e:
logger.error(f"[JOIN] Error handling ready: {e}")
if hasattr(self.mqtt_client, 'add_message_handler'):
self.mqtt_client.add_message_handler(topic, handle_ready)
from ..mqtt.types import MQTTQoS
await self.mqtt_client.subscribe(topic, MQTTQoS.AT_LEAST_ONCE)
async def _publish_join_request(self):
"""Publish join request to host."""
topic = HandshakeProtocol.join_topic(self.session_id)
payload = json.dumps({
"agent_id": self.agent_id,
"session_id": self.session_id,
"capabilities": self.capabilities,
"metadata": self.metadata,
"timestamp": datetime.now().isoformat(),
})
from ..mqtt.types import MQTTQoS
await self.mqtt_client.publish(
topic=topic,
payload=payload,
qos=MQTTQoS.AT_LEAST_ONCE,
)
async def _wait_for_ready(self):
"""Wait for both acknowledgement and all_ready signal."""
await self._acknowledged.wait()
await self._all_ready.wait()

View File

@ -15,6 +15,7 @@ from ..mqtt import MQTTClient, MQTTConfig, MQTTPublisher, MQTTSubscriber
from ..mqtt.types import MQTTConnectionState, MQTTQoS from ..mqtt.types import MQTTConnectionState, MQTTQoS
from ..broker import BrokerManager, BrokerConfig from ..broker import BrokerManager, BrokerConfig
from ..middleware import MQTTBrokerMiddleware from ..middleware import MQTTBrokerMiddleware
from ..coordination import ConversationHost, ConversationJoiner, ConversationState
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -51,6 +52,9 @@ class MCMQTTServer(MCPMixin):
self._connection_state = MQTTConnectionState.DISCONNECTED self._connection_state = MQTTConnectionState.DISCONNECTED
self._last_error: Optional[str] = None self._last_error: Optional[str] = None
self._message_store: List[Dict[str, Any]] = [] self._message_store: List[Dict[str, Any]] = []
# Conversation coordination state
self._active_conversations: Dict[str, Any] = {} # session_id -> ConversationSession
# Register all MCP components # Register all MCP components
self.register_all(self.mcp) self.register_all(self.mcp)
@ -604,7 +608,305 @@ class MCMQTTServer(MCPMixin):
"message": f"Error stopping brokers: {str(e)}", "message": f"Error stopping brokers: {str(e)}",
"stopped_count": 0 "stopped_count": 0
} }
# Agent-to-Agent Coordination Tools
# These tools solve the "ships passing in the night" problem where agents
# publish messages before other agents have subscribed, causing dropped messages.
@mcp_tool(
name="mqtt_host_conversation",
description="""Host a coordinated agent-to-agent conversation. The initiating agent
MUST use this tool to ensure other agents are ready before publishing.
This tool:
1. Spawns an embedded broker (or uses existing one)
2. Publishes broker connection info
3. Waits for ALL expected agents to join and subscribe
4. Returns only when conversation is ready
CRITICAL: Always use this instead of mqtt_connect when coordinating with other agents.
This prevents 'dropped messages' where you publish before others subscribe."""
)
async def host_conversation(
self,
session_id: str,
host_agent_id: str,
expected_agents: List[str],
broker_host: str = "127.0.0.1",
broker_port: int = 0,
timeout_seconds: float = 30.0
) -> Dict[str, Any]:
"""
Host a coordinated conversation - initiating agent's role.
Args:
session_id: Unique identifier for this conversation
host_agent_id: Your agent ID (the host)
expected_agents: List of agent IDs that must join before starting
broker_host: Host to bind broker to (default: 127.0.0.1)
broker_port: Port for broker (0 = auto-assign)
timeout_seconds: Max time to wait for agents to join
Returns:
Dict with broker connection info and conversation state
"""
try:
# Ensure we have a client (may need to initialize if not done)
if not self.mqtt_client:
# Initialize with default config for hosting
config = MQTTConfig(
broker_host=broker_host,
broker_port=broker_port if broker_port > 0 else 1883,
client_id=f"host-{host_agent_id}"
)
await self.initialize_mqtt_client(config)
# Create the conversation host
host = ConversationHost(
mqtt_client=self.mqtt_client,
broker_manager=self.broker_manager,
session_id=session_id,
host_agent_id=host_agent_id,
expected_agents=expected_agents,
broker_host=broker_host,
broker_port=broker_port,
)
# Start hosting and wait for agents
logger.info(
f"[HOST] Starting conversation {session_id}, "
f"waiting for agents: {expected_agents}"
)
session = await host.start_and_wait(timeout=timeout_seconds)
# Store active conversation
self._active_conversations[session_id] = session
# Build response based on state
if session.state == ConversationState.READY:
return {
"success": True,
"message": f"Conversation ready! All {len(expected_agents)} agents joined.",
"session_id": session_id,
"state": session.state.value,
"broker_host": session.broker_host,
"broker_port": session.broker_port,
"broker_url": f"mqtt://{session.broker_host}:{session.broker_port}",
"joined_agents": list(session.joined_agents.keys()),
"conversation_topic": f"conversation/{session_id}/main",
"ready_to_publish": True
}
elif session.state == ConversationState.TIMEOUT:
return {
"success": False,
"message": f"Timeout waiting for agents. Missing: {session.missing_agents}",
"session_id": session_id,
"state": session.state.value,
"broker_host": session.broker_host,
"broker_port": session.broker_port,
"joined_agents": list(session.joined_agents.keys()),
"missing_agents": session.missing_agents,
"ready_to_publish": False
}
else:
return {
"success": False,
"message": f"Conversation in unexpected state: {session.state.value}",
"session_id": session_id,
"state": session.state.value,
"ready_to_publish": False
}
except Exception as e:
logger.error(f"Error hosting conversation: {e}")
return {
"success": False,
"message": f"Error hosting conversation: {str(e)}",
"session_id": session_id,
"ready_to_publish": False
}
@mcp_tool(
name="mqtt_join_conversation",
description="""Join a coordinated agent-to-agent conversation hosted by another agent.
This tool:
1. Connects to the broker at the specified host/port
2. Subscribes to coordination topics
3. Signals to the host that you're ready
4. Waits for acknowledgement and 'all ready' signal
CRITICAL: Always use this instead of mqtt_connect when joining a conversation
started by another agent. This ensures you're subscribed BEFORE messages flow."""
)
async def join_conversation(
self,
session_id: str,
agent_id: str,
broker_host: str,
broker_port: int,
capabilities: Optional[List[str]] = None,
timeout_seconds: float = 30.0
) -> Dict[str, Any]:
"""
Join a coordinated conversation - invited agent's role.
Args:
session_id: Session ID provided by the hosting agent
agent_id: Your unique agent ID
broker_host: Broker host (from host's invitation)
broker_port: Broker port (from host's invitation)
capabilities: Optional list of your capabilities
timeout_seconds: Max time to wait for acknowledgement
Returns:
Dict with join status and conversation info
"""
try:
# First connect to the broker
config = MQTTConfig(
broker_host=broker_host,
broker_port=broker_port,
client_id=f"joiner-{agent_id}"
)
success = await self.initialize_mqtt_client(config)
if not success:
return {
"success": False,
"message": f"Failed to initialize MQTT client: {self._last_error}",
"session_id": session_id,
"agent_id": agent_id
}
connect_success = await self.connect_mqtt()
if not connect_success:
return {
"success": False,
"message": f"Failed to connect to broker: {self._last_error}",
"session_id": session_id,
"agent_id": agent_id
}
# Create the conversation joiner
joiner = ConversationJoiner(
mqtt_client=self.mqtt_client,
session_id=session_id,
agent_id=agent_id,
broker_host=broker_host,
broker_port=broker_port,
capabilities=capabilities or [],
)
logger.info(
f"[JOIN] Agent {agent_id} joining conversation {session_id}"
)
# Join and wait for acknowledgement
result = await joiner.join_and_wait(timeout=timeout_seconds)
if result["success"]:
return {
"success": True,
"message": f"Successfully joined conversation {session_id}!",
"session_id": session_id,
"agent_id": agent_id,
"broker_host": broker_host,
"broker_port": broker_port,
"other_agents": result.get("other_agents", []),
"conversation_topic": result.get("conversation_topic"),
"ready_to_receive": True
}
else:
return {
"success": False,
"message": result.get("message", "Failed to join conversation"),
"error": result.get("error"),
"session_id": session_id,
"agent_id": agent_id,
"ready_to_receive": False
}
except Exception as e:
logger.error(f"Error joining conversation: {e}")
return {
"success": False,
"message": f"Error joining conversation: {str(e)}",
"session_id": session_id,
"agent_id": agent_id,
"ready_to_receive": False
}
@mcp_tool(
name="mqtt_conversation_status",
description="Get status of an active coordinated conversation"
)
async def get_conversation_status(self, session_id: str) -> Dict[str, Any]:
"""Get the current status of a conversation session."""
try:
if session_id not in self._active_conversations:
return {
"success": False,
"message": f"No active conversation with session_id: {session_id}",
"session_id": session_id
}
session = self._active_conversations[session_id]
return {
"success": True,
"session_id": session_id,
"state": session.state.value,
"host_agent_id": session.host_agent_id,
"broker_host": session.broker_host,
"broker_port": session.broker_port,
"expected_agents": session.expected_agents,
"joined_agents": list(session.joined_agents.keys()),
"missing_agents": session.missing_agents,
"all_agents_joined": session.all_agents_joined,
"created_at": session.created_at.isoformat()
}
except Exception as e:
return {
"success": False,
"message": f"Error getting conversation status: {str(e)}",
"session_id": session_id
}
@mcp_tool(
name="mqtt_list_conversations",
description="List all active coordinated conversations"
)
async def list_conversations(self) -> Dict[str, Any]:
"""List all active conversation sessions."""
try:
conversations = []
for session_id, session in self._active_conversations.items():
conversations.append({
"session_id": session_id,
"state": session.state.value,
"host_agent_id": session.host_agent_id,
"broker_url": f"mqtt://{session.broker_host}:{session.broker_port}",
"agent_count": len(session.joined_agents),
"expected_count": len(session.expected_agents),
"all_ready": session.all_agents_joined
})
return {
"success": True,
"conversations": conversations,
"total_count": len(conversations)
}
except Exception as e:
return {
"success": False,
"message": f"Error listing conversations: {str(e)}",
"conversations": []
}
# MCP Resources using MCPMixin pattern # MCP Resources using MCPMixin pattern
@mcp_resource(uri="mqtt://config") @mcp_resource(uri="mqtt://config")
async def get_config_resource(self) -> Dict[str, Any]: async def get_config_resource(self) -> Dict[str, Any]: