mcmqtt/README.md
Ryan Malloy da140603c1 Add AI model search-friendly problem/solution sections
- Common AI coordination challenges with mcmqtt solutions
- Keyword-rich content for AI model discovery
- Real-world problem statements models search for
- Solutions for multi-agent workflows, model chaining, fault tolerance
- Added community tagline for connection
- Optimized for AI model solution searching
2025-09-17 06:48:23 -06:00

20 KiB

🚀 mcmqtt - FastMCP MQTT Server

The most powerful FastMCP MQTT integration server on the planet 🌍

Version Python License Tests Coverage

Enabling MQTT integration for MCP clients with embedded broker support and fractal agent orchestration

Key Features

  • 🔥 FastMCP Integration: Native Model Context Protocol server with MQTT tools
  • Embedded MQTT Brokers: Spawn brokers on-demand with zero configuration
  • 🏗️ Modular Architecture: Clean, testable, maintainable codebase
  • 🧪 Comprehensive Testing: 70+ tests with 96%+ coverage on core modules
  • 🌐 Cross-Platform: Designed for Linux, macOS, and Windows (TODO: Test on additional platforms)
  • 🔧 CLI & Programmatic: Use via command line or integrate into your code
  • 📡 Real-time Coordination: Perfect for agent swarms and distributed systems

🚀 Quick Start

Installation

Recommended: Use uvx for instant execution (no installation needed):

# Run directly with uvx (recommended)
uvx mcmqtt --help

# Start STDIO server for MCP clients
uvx mcmqtt

# HTTP mode for web integration  
uvx mcmqtt --transport http --port 8080

If you insist on traditional installation:

# Install with uv
uv add mcmqtt

# Or use pip
pip install mcmqtt

Instant MQTT Magic

# Start FastMCP MQTT server (default STDIO mode) - Just works!
uvx mcmqtt

# HTTP mode for web integration
uvx mcmqtt --transport http --port 8080

# Connect to existing broker (optional)
uvx mcmqtt --mqtt-host mqtt.example.com --mqtt-port 1883

MCP Integration

Add to your Claude Code MCP configuration:

# Add mcmqtt as an MCP server (zero configuration!)
claude mcp add task-buzz -- uvx mcmqtt

# Test the connection
claude mcp test task-buzz

🛠️ Core Features

🏃‍♂️ FastMCP MQTT Tools

  • mqtt_connect - Connect to MQTT brokers
  • mqtt_publish - Publish messages with QoS support
  • mqtt_subscribe - Subscribe to topics with wildcards
  • mqtt_get_messages - Retrieve received messages
  • mqtt_status - Get connection and statistics
  • mqtt_spawn_broker - Create embedded brokers instantly
  • mqtt_list_brokers - Manage multiple brokers

🔧 Embedded Broker Management

from mcmqtt.broker import BrokerManager

# Spawn a broker programmatically
manager = BrokerManager()
broker_info = await manager.spawn_broker(
    name="my-broker",
    port=1883,
    max_connections=100
)

print(f"Broker running at: {broker_info.url}")

📡 MQTT Client Integration

from mcmqtt.mqtt import MQTTClient
from mcmqtt.mqtt.types import MQTTConfig

config = MQTTConfig(
    broker_host="localhost",
    broker_port=1883,
    client_id="my-client"
)

client = MQTTClient(config)
await client.connect()
await client.publish("sensors/temperature", "23.5")

🏗️ Architecture Excellence

This isn't your typical monolithic MQTT library. mcmqtt features a clean modular architecture:

mcmqtt/
├── cli/           # Command-line interface & argument parsing
├── config/        # Environment & configuration management  
├── logging/       # Structured logging setup
├── server/        # STDIO & HTTP server runners
├── mqtt/          # Core MQTT client functionality
├── mcp/           # FastMCP server integration
├── broker/        # Embedded broker management
└── middleware/    # Broker middleware & orchestration

🧪 Testing Excellence

  • 70+ comprehensive tests covering all modules
  • 96%+ code coverage on refactored components
  • Robust mocking for reliable CI/CD
  • Edge case coverage for production reliability

🌟 Real-World Use Cases

🤖 AI Agent Orchestration & Swarm Coordination

Fractal Agent Architecture: Build sophisticated agent swarms where parent agents delegate tasks to specialized subagents via MQTT messaging.

# 1. Setup mcmqtt as MCP server for agent coordination
claude mcp add task-buzz -- uvx mcmqtt

# 2. Now parent Claude Code agents can coordinate with subagents

Example: Multi-Agent Code Analysis Workflow

# Parent Agent (you) delegates analysis tasks via MQTT:

# 1. Spawn embedded broker for coordination
mqtt_spawn_broker --name "code-analysis" --port 1883

# 2. Create specialized subagents for different domains  
claude -p "You are a security-analysis-agent. Subscribe to 'tasks/security' topic and analyze code for vulnerabilities. Use mqtt_subscribe and mqtt_publish tools."

claude -p "You are a performance-agent. Subscribe to 'tasks/performance' and optimize code. Report results to 'results/performance'."

claude -p "You are a test-agent. Subscribe to 'tasks/testing' and write comprehensive tests. Publish coverage to 'results/testing'."

# 3. Parent agent coordinates the workflow
mqtt_publish --topic "tasks/security" --payload '{"file": "auth.py", "priority": "high"}'
mqtt_publish --topic "tasks/performance" --payload '{"file": "database.py", "focus": "query_optimization"}'  
mqtt_publish --topic "tasks/testing" --payload '{"file": "api.py", "coverage_target": "95%"}'

# 4. Monitor results from all agents
mqtt_subscribe --topic "results/+" --callback aggregate_analysis_report

Advanced Fractal Patterns:

  • Recursive Task Delegation: Subagents can spawn their own sub-subagents for complex domains
  • Dynamic Load Balancing: Monitor agent workload via heartbeat topics, redistribute tasks automatically
  • Cross-Agent Learning: Agents share insights via knowledge/domain topics
  • Fault Tolerance: Dead agent detection via missed heartbeats, automatic task redistribution

Use Cases:

  • Large Codebase Analysis: Break down monolith analysis across specialized agents (security, performance, testing, documentation)
  • Multi-Language Projects: Deploy language-specific agents (Python-agent, JavaScript-agent, Rust-agent)
  • Distributed Code Review: Parallel analysis of pull requests with different focuses
  • CI/CD Integration: Agents monitor git hooks, trigger builds, run tests, deploy services

📊 Real-Time IoT & Sensor Data Processing

Zero-Config IoT Integration: Perfect for collecting, processing, and routing sensor data without complex infrastructure.

# Example: Temperature monitoring system
# 1. Start mcmqtt server
uvx mcmqtt

# 2. Use MCP tools to:
#    - Subscribe to sensors/+/temperature 
#    - Publish alerts to alerts/high-temp
#    - Forward to analytics/warehouse

Use Cases:

  • Smart building automation: Collect data from thermostats, motion sensors, lighting systems
  • Industrial monitoring: Equipment health, production metrics, safety alerts
  • Environmental tracking: Weather stations, air quality monitors, noise levels
  • Agricultural IoT: Soil moisture, crop monitoring, automated irrigation

🔄 Microservice Event Streaming

Service Mesh Communication: Replace complex message queues with simple MQTT patterns for microservice coordination.

# Start dedicated MQTT server for service mesh
uvx mcmqtt --transport http --port 8080

# Services communicate via MQTT topics:
# - orders/created, orders/fulfilled
# - payments/processed, payments/failed  
# - inventory/updated, inventory/low-stock

Use Cases:

  • Event sourcing: Capture all state changes as MQTT events for audit trails
  • Saga patterns: Coordinate complex business transactions across multiple services
  • Cache invalidation: Notify services when shared data changes
  • Health monitoring: Services publish heartbeats and status updates

🎮 Real-Time Gaming & Live Applications

Low-Latency Messaging: Build multiplayer games, live chat, and real-time collaboration tools.

# Game server coordination
claude mcp add game-events -- uvx mcmqtt --transport http

Use Cases:

  • Multiplayer game state: Player positions, actions, world updates
  • Live chat systems: Message broadcasting, user presence, typing indicators
  • Collaborative editing: Document changes, cursor positions, user selections
  • Live streaming: Chat messages, viewer counts, donations, moderation events

🏭 DevOps & Infrastructure Automation

Deployment Orchestration: Coordinate deployments, monitoring, and incident response across infrastructure.

# Infrastructure coordination
uvx mcmqtt --mqtt-host production-mqtt.company.com

Use Cases:

  • CI/CD pipelines: Build status, deployment triggers, rollback coordination
  • Container orchestration: Service discovery, load balancer updates, scaling events
  • Monitoring alerts: Error spikes, performance degradation, security incidents
  • Backup coordination: Database snapshots, file synchronization, disaster recovery

🛍️ E-commerce & Business Process Automation

Order Processing Workflows: Handle complex business processes with event-driven patterns.

Use Cases:

  • Order fulfillment: Inventory checks → payment processing → shipping → delivery notifications
  • Customer journey: Cart abandonment → email campaigns → purchase → support follow-up
  • Supply chain: Supplier updates → inventory management → demand forecasting
  • Fraud detection: Transaction monitoring → risk scoring → manual review triggers

📱 Mobile & Edge Computing

Offline-First Applications: Handle intermittent connectivity and edge computing scenarios.

Use Cases:

  • Mobile app synchronization: User data, settings, offline queue processing
  • Edge device coordination: Factory floor devices, retail kiosks, field equipment
  • Bandwidth optimization: Intelligent data batching, compression, prioritization
  • Mesh networking: Device-to-device communication, local data processing

🧠 Multi-Model AI Orchestration & MCP Server Coordination

The Ultimate AI Party: Coordinate different AI models, MCP servers, and specialized tools through MQTT messaging for complex multi-step workflows.

# Setup the coordination hub
claude mcp add task-buzz -- uvx mcmqtt

# Add your MCP server friends to the party
claude mcp add web-search -- uvx mcp-server-fetch
claude mcp add code-search -- uvx searchmcp  
claude mcp add file-ops -- uvx mcp-server-filesystem
claude mcp add db-query -- uvx mcp-server-sqlite
claude mcp add memory-store -- uvx mcp-server-memory
claude mcp add brave-search -- uvx mcp-server-brave-search

Example: Multi-Model Research & Analysis Pipeline

# 1. Coordinator agent orchestrates the entire workflow
mqtt_publish --topic "workflow/start" --payload '{
  "task": "analyze_ai_trends_2024",
  "models_needed": ["web_search", "code_analysis", "data_processing", "report_generation"]
}'

# 2. Web Search Agent (Claude + Brave Search MCP)
claude -p "You are web-search-agent. Subscribe to 'tasks/web_search'. Use brave-search tools to find latest AI trends. Publish findings to 'data/web_results'."

# 3. Code Analysis Agent (Claude + Code Search MCP) 
claude -p "You are code-agent. Subscribe to 'tasks/code_analysis'. Use searchmcp to find relevant code examples. Publish to 'data/code_results'."

# 4. Data Processing Agent (Claude + SQLite MCP)
claude -p "You are data-agent. Subscribe to 'data/+' topics. Use sqlite tools to store and analyze collected data. Publish insights to 'insights/processed'."

# 5. Report Generation Agent (Claude + File System MCP)
claude -p "You are report-agent. Subscribe to 'insights/+'. Use filesystem tools to generate comprehensive reports. Publish completion to 'workflow/complete'."

# Coordinator orchestrates the flow
mqtt_publish --topic "tasks/web_search" --payload '{"query": "AI trends 2024", "sources": 10}'
mqtt_publish --topic "tasks/code_analysis" --payload '{"focus": "LLM implementation patterns"}'

# Monitor progress across all agents
mqtt_subscribe --topic "workflow/+" --callback track_pipeline_progress

Advanced Multi-Model Patterns:

🔄 Model Chaining & Handoffs:

# GPT-4 for initial analysis → Claude for code review → Local model for privacy-sensitive data
mqtt_publish --topic "models/gpt4/analyze" --payload '{"data": "public_dataset.json"}'
# GPT-4 publishes to models/claude/review → Claude publishes to models/local/sensitive

🎭 Specialized Model Routing:

# Route different task types to optimal models
mqtt_publish --topic "routing/classify" --payload '{"task": "legal_document_analysis"}'
# Router sends to models/legal_llm/ vs models/code_llm/ vs models/creative_llm/

🧪 A/B Testing & Model Comparison:

# Send same prompt to multiple models for comparison
mqtt_publish --topic "models/compare" --payload '{
  "prompt": "Explain quantum computing",
  "models": ["gpt4", "claude", "gemini", "local-llama"],
  "evaluation_criteria": ["accuracy", "clarity", "depth"]
}'

🔗 MCP Server Coordination Examples:

Database + Web + Memory Pipeline:

# 1. Brave Search finds information → SQLite stores → Memory recalls context
mqtt_publish --topic "research/start" --payload '{"topic": "sustainable_energy"}'

# Web agent searches and stores
brave_search → mqtt_publish("data/raw") → sqlite_store → mqtt_publish("data/indexed")

# Memory agent builds context
memory_store → mqtt_publish("context/ready") → next_agents_process

File System + Code Search + Web Integration:

# 1. File system agent monitors project changes
filesystem_watch → mqtt_publish("code/changed") 

# 2. Code search agent analyzes patterns  
searchmcp_analyze → mqtt_publish("patterns/found")

# 3. Web agent researches best practices
web_fetch → mqtt_publish("best_practices/updated")

# 4. Integration agent updates documentation
filesystem_write_docs → mqtt_publish("docs/updated")

Real-World Multi-Model Use Cases:

  • Research Pipeline: Web search → document analysis → data extraction → report generation
  • Code Intelligence: Repository analysis → pattern detection → documentation generation → test creation
  • Content Creation: Research → outline → writing → fact-checking → editing → publishing
  • Decision Support: Data gathering → analysis → modeling → recommendation → validation
  • Quality Assurance: Multi-model review → consensus building → error detection → improvement suggestions

🎪 The MCP Server Party Benefits:

  • Specialized Expertise: Each MCP server brings unique capabilities (web search, file ops, databases, memory)
  • Fault Tolerance: If one model/server fails, workflow continues with others
  • Cost Optimization: Route expensive tasks to premium models, routine tasks to efficient models
  • Privacy Layers: Sensitive data stays with local models, public data uses cloud models
  • Scalability: Add new models and MCP servers without changing existing workflows

🔍 Common AI Model Challenges mcmqtt Solves

"How can I coordinate multiple AI agents working on the same task?"

Solution: Use mcmqtt's MQTT messaging to orchestrate agent workflows. Parent agents publish tasks to topic channels, specialized agents subscribe and process their domain, then publish results back for aggregation.

"I need to distribute work across multiple language models efficiently"

Solution: mcmqtt enables model routing and load balancing. Send tasks to models/{model_name}/queue topics and let each model process at their optimal rate. Monitor via models/{model_name}/status for health and performance.

"How do I build event-driven AI workflows that react to real-time data?"

Solution: Use mcmqtt's pub/sub pattern. Data sources publish to events/{category} topics, AI agents subscribe to relevant categories and process in real-time. Perfect for IoT sensors, user actions, system events.

"I want to chain different AI models for complex multi-step processing"

Solution: Create processing pipelines where each model publishes to the next stage's input topic. Example: raw_datamodel_a/processmodel_b/refinemodel_c/finalizeresults/complete.

"How can I make my AI system fault-tolerant and handle model failures?"

Solution: mcmqtt's message persistence and multiple subscriber patterns mean if one model instance fails, others can pick up the work. Use dead_letter topics for failed processing and retry logic.

"I need to coordinate AI agents across different machines and networks"

Solution: mcmqtt works over network connections. Deploy agents on different servers, all connecting to the same MQTT broker for seamless distributed coordination.

"How do I implement real-time collaboration between AI models?"

Solution: Use mcmqtt's instant messaging for models to share context, ask questions, and coordinate decisions. Models can publish to collaboration/{session_id} topics for real-time interaction.

"I want to build an AI system that learns from multiple agents' experiences"

Solution: Agents publish their learning insights to knowledge/{domain} topics. Other agents subscribe and incorporate shared knowledge, creating a collective intelligence system.

"How can I monitor and debug complex AI agent interactions?"

Solution: mcmqtt provides built-in message history, subscription tracking, and status monitoring. All agent communications are logged and can be analyzed for debugging and optimization.

"I need to integrate AI models with existing enterprise systems and APIs"

Solution: mcmqtt bridges AI and enterprise systems via MQTT. Legacy systems publish events, AI models process and respond, results integrate back into business workflows seamlessly.

⚙️ Configuration

Environment Variables

export MQTT_BROKER_HOST=localhost
export MQTT_BROKER_PORT=1883
export MQTT_CLIENT_ID=my-client
export MQTT_USERNAME=user
export MQTT_PASSWORD=secret
export MQTT_USE_TLS=true

Command Line Options

mcmqtt --help

Options:
  --transport [stdio|http]     Server transport mode
  --mqtt-host TEXT            MQTT broker hostname
  --mqtt-port INTEGER         MQTT broker port  
  --mqtt-client-id TEXT       MQTT client identifier
  --auto-broker              Spawn embedded broker
  --log-level [DEBUG|INFO|WARNING|ERROR]
  --log-file PATH             Log to file

🚦 Development

Requirements

  • Python 3.11+
  • UV package manager (recommended)
  • FastMCP framework
  • Paho MQTT client

Setup

# Clone the repository
git clone https://git.supported.systems/MCP/mcmqtt.git
cd mcmqtt

# Install dependencies
uv sync

# Run tests
uv run pytest

# Build package
uv build

Testing

# Run all tests
uv run pytest tests/

# Run with coverage
uv run pytest --cov=src/mcmqtt --cov-report=html

# Test specific modules
uv run pytest tests/unit/test_cli_comprehensive.py -v

📈 Performance

  • Lightweight: Minimal memory footprint
  • Fast: Async/await throughout for maximum throughput
  • Scalable: Handle thousands of concurrent connections
  • Reliable: Comprehensive error handling and retry logic

🤝 Contributing

We love contributions! This project follows the "campground rule" - leave it better than you found it.

  1. Fork the repository
  2. Create a feature branch
  3. Add tests for new functionality
  4. Ensure all tests pass
  5. Submit a pull request

📄 License

MIT License - see LICENSE for details.

🙏 Credits

Created with ❤️ by Ryan Malloy

Built on the shoulders of giants:


Ready to revolutionize your MQTT integration? Install mcmqtt today! 🚀

uvx mcmqtt --transport stdio --auto-broker

Built with ❤️ for the AI developer community
Powered by FastMCP • Distributed via uvx • Coordinated through MQTT