- Detailed AI agent orchestration with Claude Code examples - Multi-agent code analysis workflow demonstration - Advanced fractal patterns (recursive delegation, load balancing) - Real-world scenarios: IoT, microservices, gaming, DevOps, e-commerce - Shows practical value for various industries and use cases - Ready for professional PyPI publication
12 KiB
🚀 mcmqtt - FastMCP MQTT Server
The most powerful FastMCP MQTT integration server on the planet 🌍
Enabling MQTT integration for MCP clients with embedded broker support and fractal agent orchestration
✨ Key Features
- 🔥 FastMCP Integration: Native Model Context Protocol server with MQTT tools
- ⚡ Embedded MQTT Brokers: Spawn brokers on-demand with zero configuration
- 🏗️ Modular Architecture: Clean, testable, maintainable codebase
- 🧪 Comprehensive Testing: 70+ tests with 96%+ coverage on core modules
- 🌐 Cross-Platform: Designed for Linux, macOS, and Windows (TODO: Test on additional platforms)
- 🔧 CLI & Programmatic: Use via command line or integrate into your code
- 📡 Real-time Coordination: Perfect for agent swarms and distributed systems
🚀 Quick Start
Installation
Recommended: Use uvx
for instant execution (no installation needed):
# Run directly with uvx (recommended)
uvx mcmqtt --help
# Start STDIO server for MCP clients
uvx mcmqtt
# HTTP mode for web integration
uvx mcmqtt --transport http --port 8080
If you insist on traditional installation:
# Install with uv
uv add mcmqtt
# Or use pip
pip install mcmqtt
Instant MQTT Magic
# Start FastMCP MQTT server (default STDIO mode) - Just works!
uvx mcmqtt
# HTTP mode for web integration
uvx mcmqtt --transport http --port 8080
# Connect to existing broker (optional)
uvx mcmqtt --mqtt-host mqtt.example.com --mqtt-port 1883
MCP Integration
Add to your Claude Code MCP configuration:
# Add mcmqtt as an MCP server (zero configuration!)
claude mcp add task-buzz -- uvx mcmqtt
# Test the connection
claude mcp test task-buzz
🛠️ Core Features
🏃♂️ FastMCP MQTT Tools
mqtt_connect
- Connect to MQTT brokersmqtt_publish
- Publish messages with QoS supportmqtt_subscribe
- Subscribe to topics with wildcardsmqtt_get_messages
- Retrieve received messagesmqtt_status
- Get connection and statisticsmqtt_spawn_broker
- Create embedded brokers instantlymqtt_list_brokers
- Manage multiple brokers
🔧 Embedded Broker Management
from mcmqtt.broker import BrokerManager
# Spawn a broker programmatically
manager = BrokerManager()
broker_info = await manager.spawn_broker(
name="my-broker",
port=1883,
max_connections=100
)
print(f"Broker running at: {broker_info.url}")
📡 MQTT Client Integration
from mcmqtt.mqtt import MQTTClient
from mcmqtt.mqtt.types import MQTTConfig
config = MQTTConfig(
broker_host="localhost",
broker_port=1883,
client_id="my-client"
)
client = MQTTClient(config)
await client.connect()
await client.publish("sensors/temperature", "23.5")
🏗️ Architecture Excellence
This isn't your typical monolithic MQTT library. mcmqtt features a clean modular architecture:
mcmqtt/
├── cli/ # Command-line interface & argument parsing
├── config/ # Environment & configuration management
├── logging/ # Structured logging setup
├── server/ # STDIO & HTTP server runners
├── mqtt/ # Core MQTT client functionality
├── mcp/ # FastMCP server integration
├── broker/ # Embedded broker management
└── middleware/ # Broker middleware & orchestration
🧪 Testing Excellence
- 70+ comprehensive tests covering all modules
- 96%+ code coverage on refactored components
- Robust mocking for reliable CI/CD
- Edge case coverage for production reliability
🌟 Real-World Use Cases
🤖 AI Agent Orchestration & Swarm Coordination
Fractal Agent Architecture: Build sophisticated agent swarms where parent agents delegate tasks to specialized subagents via MQTT messaging.
# 1. Setup mcmqtt as MCP server for agent coordination
claude mcp add task-buzz -- uvx mcmqtt
# 2. Now parent Claude Code agents can coordinate with subagents
Example: Multi-Agent Code Analysis Workflow
# Parent Agent (you) delegates analysis tasks via MQTT:
# 1. Spawn embedded broker for coordination
mqtt_spawn_broker --name "code-analysis" --port 1883
# 2. Create specialized subagents for different domains
claude -p "You are a security-analysis-agent. Subscribe to 'tasks/security' topic and analyze code for vulnerabilities. Use mqtt_subscribe and mqtt_publish tools."
claude -p "You are a performance-agent. Subscribe to 'tasks/performance' and optimize code. Report results to 'results/performance'."
claude -p "You are a test-agent. Subscribe to 'tasks/testing' and write comprehensive tests. Publish coverage to 'results/testing'."
# 3. Parent agent coordinates the workflow
mqtt_publish --topic "tasks/security" --payload '{"file": "auth.py", "priority": "high"}'
mqtt_publish --topic "tasks/performance" --payload '{"file": "database.py", "focus": "query_optimization"}'
mqtt_publish --topic "tasks/testing" --payload '{"file": "api.py", "coverage_target": "95%"}'
# 4. Monitor results from all agents
mqtt_subscribe --topic "results/+" --callback aggregate_analysis_report
Advanced Fractal Patterns:
- Recursive Task Delegation: Subagents can spawn their own sub-subagents for complex domains
- Dynamic Load Balancing: Monitor agent workload via heartbeat topics, redistribute tasks automatically
- Cross-Agent Learning: Agents share insights via
knowledge/domain
topics - Fault Tolerance: Dead agent detection via missed heartbeats, automatic task redistribution
Use Cases:
- Large Codebase Analysis: Break down monolith analysis across specialized agents (security, performance, testing, documentation)
- Multi-Language Projects: Deploy language-specific agents (Python-agent, JavaScript-agent, Rust-agent)
- Distributed Code Review: Parallel analysis of pull requests with different focuses
- CI/CD Integration: Agents monitor git hooks, trigger builds, run tests, deploy services
📊 Real-Time IoT & Sensor Data Processing
Zero-Config IoT Integration: Perfect for collecting, processing, and routing sensor data without complex infrastructure.
# Example: Temperature monitoring system
# 1. Start mcmqtt server
uvx mcmqtt
# 2. Use MCP tools to:
# - Subscribe to sensors/+/temperature
# - Publish alerts to alerts/high-temp
# - Forward to analytics/warehouse
Use Cases:
- Smart building automation: Collect data from thermostats, motion sensors, lighting systems
- Industrial monitoring: Equipment health, production metrics, safety alerts
- Environmental tracking: Weather stations, air quality monitors, noise levels
- Agricultural IoT: Soil moisture, crop monitoring, automated irrigation
🔄 Microservice Event Streaming
Service Mesh Communication: Replace complex message queues with simple MQTT patterns for microservice coordination.
# Start dedicated MQTT server for service mesh
uvx mcmqtt --transport http --port 8080
# Services communicate via MQTT topics:
# - orders/created, orders/fulfilled
# - payments/processed, payments/failed
# - inventory/updated, inventory/low-stock
Use Cases:
- Event sourcing: Capture all state changes as MQTT events for audit trails
- Saga patterns: Coordinate complex business transactions across multiple services
- Cache invalidation: Notify services when shared data changes
- Health monitoring: Services publish heartbeats and status updates
🎮 Real-Time Gaming & Live Applications
Low-Latency Messaging: Build multiplayer games, live chat, and real-time collaboration tools.
# Game server coordination
claude mcp add game-events -- uvx mcmqtt --transport http
Use Cases:
- Multiplayer game state: Player positions, actions, world updates
- Live chat systems: Message broadcasting, user presence, typing indicators
- Collaborative editing: Document changes, cursor positions, user selections
- Live streaming: Chat messages, viewer counts, donations, moderation events
🏭 DevOps & Infrastructure Automation
Deployment Orchestration: Coordinate deployments, monitoring, and incident response across infrastructure.
# Infrastructure coordination
uvx mcmqtt --mqtt-host production-mqtt.company.com
Use Cases:
- CI/CD pipelines: Build status, deployment triggers, rollback coordination
- Container orchestration: Service discovery, load balancer updates, scaling events
- Monitoring alerts: Error spikes, performance degradation, security incidents
- Backup coordination: Database snapshots, file synchronization, disaster recovery
🛍️ E-commerce & Business Process Automation
Order Processing Workflows: Handle complex business processes with event-driven patterns.
Use Cases:
- Order fulfillment: Inventory checks → payment processing → shipping → delivery notifications
- Customer journey: Cart abandonment → email campaigns → purchase → support follow-up
- Supply chain: Supplier updates → inventory management → demand forecasting
- Fraud detection: Transaction monitoring → risk scoring → manual review triggers
📱 Mobile & Edge Computing
Offline-First Applications: Handle intermittent connectivity and edge computing scenarios.
Use Cases:
- Mobile app synchronization: User data, settings, offline queue processing
- Edge device coordination: Factory floor devices, retail kiosks, field equipment
- Bandwidth optimization: Intelligent data batching, compression, prioritization
- Mesh networking: Device-to-device communication, local data processing
⚙️ Configuration
Environment Variables
export MQTT_BROKER_HOST=localhost
export MQTT_BROKER_PORT=1883
export MQTT_CLIENT_ID=my-client
export MQTT_USERNAME=user
export MQTT_PASSWORD=secret
export MQTT_USE_TLS=true
Command Line Options
mcmqtt --help
Options:
--transport [stdio|http] Server transport mode
--mqtt-host TEXT MQTT broker hostname
--mqtt-port INTEGER MQTT broker port
--mqtt-client-id TEXT MQTT client identifier
--auto-broker Spawn embedded broker
--log-level [DEBUG|INFO|WARNING|ERROR]
--log-file PATH Log to file
🚦 Development
Requirements
- Python 3.11+
- UV package manager (recommended)
- FastMCP framework
- Paho MQTT client
Setup
# Clone the repository
git clone https://git.supported.systems/MCP/mcmqtt.git
cd mcmqtt
# Install dependencies
uv sync
# Run tests
uv run pytest
# Build package
uv build
Testing
# Run all tests
uv run pytest tests/
# Run with coverage
uv run pytest --cov=src/mcmqtt --cov-report=html
# Test specific modules
uv run pytest tests/unit/test_cli_comprehensive.py -v
📈 Performance
- Lightweight: Minimal memory footprint
- Fast: Async/await throughout for maximum throughput
- Scalable: Handle thousands of concurrent connections
- Reliable: Comprehensive error handling and retry logic
🤝 Contributing
We love contributions! This project follows the "campground rule" - leave it better than you found it.
- Fork the repository
- Create a feature branch
- Add tests for new functionality
- Ensure all tests pass
- Submit a pull request
📄 License
MIT License - see LICENSE for details.
🙏 Credits
Created with ❤️ by Ryan Malloy
Built on the shoulders of giants:
Ready to revolutionize your MQTT integration? Install mcmqtt today! 🚀
uvx mcmqtt --transport stdio --auto-broker