Implement proper FastMCP CLI conventions

- Remove subcommands, use direct typer.run() pattern
- Default to STDIO transport (--transport stdio)
- Support HTTP transport with --transport http
- Fix async/sync handling for FastMCP server modes
- Follows standard FastMCP patterns for uvx deployment
- Ready for MCP client integration
This commit is contained in:
Ryan Malloy 2025-09-17 06:35:26 -06:00
parent 355daaff01
commit 203eff2119
2 changed files with 45 additions and 149 deletions

View File

@ -1,51 +1,20 @@
"""Main entry point for mcmqtt FastMCP MQTT server.""" """FastMCP MQTT Server - Main entry point following FastMCP conventions."""
import asyncio import asyncio
import os import os
import logging
import sys import sys
from pathlib import Path
from typing import Optional from typing import Optional
import typer import typer
from rich.console import Console from rich.console import Console
from rich.logging import RichHandler
import structlog
from .mqtt.types import MQTTConfig, MQTTQoS from .mqtt.types import MQTTConfig
from .mcp.server import MCMQTTServer from .mcp.server import MCMQTTServer
# Setup rich console # Setup rich console
console = Console() console = Console()
# Setup logging
def setup_logging(log_level: str = "INFO"):
"""Set up structured logging with rich output."""
logging.basicConfig(
level=getattr(logging, log_level.upper()),
format="%(message)s",
datefmt="[%X]",
handlers=[RichHandler(console=console)]
)
# Configure structlog
structlog.configure(
processors=[
structlog.stdlib.filter_by_level,
structlog.stdlib.add_logger_name,
structlog.stdlib.add_log_level,
structlog.stdlib.PositionalArgumentsFormatter(),
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.StackInfoRenderer(),
structlog.processors.format_exc_info,
structlog.processors.UnicodeDecoder(),
structlog.processors.JSONRenderer()
],
context_class=dict,
logger_factory=structlog.stdlib.LoggerFactory(),
wrapper_class=structlog.stdlib.BoundLogger,
cache_logger_on_first_use=True,
)
def get_version() -> str: def get_version() -> str:
"""Get package version.""" """Get package version."""
@ -55,6 +24,7 @@ def get_version() -> str:
except Exception: except Exception:
return "0.1.0" return "0.1.0"
def create_mqtt_config_from_env() -> Optional[MQTTConfig]: def create_mqtt_config_from_env() -> Optional[MQTTConfig]:
"""Create MQTT configuration from environment variables.""" """Create MQTT configuration from environment variables."""
try: try:
@ -69,7 +39,6 @@ def create_mqtt_config_from_env() -> Optional[MQTTConfig]:
username=os.getenv("MQTT_USERNAME"), username=os.getenv("MQTT_USERNAME"),
password=os.getenv("MQTT_PASSWORD"), password=os.getenv("MQTT_PASSWORD"),
keepalive=int(os.getenv("MQTT_KEEPALIVE", "60")), keepalive=int(os.getenv("MQTT_KEEPALIVE", "60")),
qos=MQTTQoS(int(os.getenv("MQTT_QOS", "1"))),
use_tls=os.getenv("MQTT_USE_TLS", "false").lower() == "true", use_tls=os.getenv("MQTT_USE_TLS", "false").lower() == "true",
clean_session=os.getenv("MQTT_CLEAN_SESSION", "true").lower() == "true", clean_session=os.getenv("MQTT_CLEAN_SESSION", "true").lower() == "true",
reconnect_interval=int(os.getenv("MQTT_RECONNECT_INTERVAL", "5")), reconnect_interval=int(os.getenv("MQTT_RECONNECT_INTERVAL", "5")),
@ -79,18 +48,11 @@ def create_mqtt_config_from_env() -> Optional[MQTTConfig]:
console.print(f"[red]Error creating MQTT config from environment: {e}[/red]") console.print(f"[red]Error creating MQTT config from environment: {e}[/red]")
return None return None
# CLI application
app = typer.Typer(
name="mcmqtt",
help="FastMCP MQTT Server - Enabling MQTT integration for MCP clients",
no_args_is_help=True
)
@app.command() def main_server(
def serve( transport: str = typer.Option("stdio", "--transport", "-t", help="Transport mode: stdio (default) or http"),
host: str = typer.Option("0.0.0.0", "--host", "-h", help="Host to bind the server to"), host: str = typer.Option("0.0.0.0", "--host", "-h", help="Host to bind the server to (HTTP mode only)"),
port: int = typer.Option(3000, "--port", "-p", help="Port to bind the server to"), port: int = typer.Option(3000, "--port", "-p", help="Port to bind the server to (HTTP mode only)"),
log_level: str = typer.Option("INFO", "--log-level", "-l", help="Log level (DEBUG, INFO, WARNING, ERROR)"),
mqtt_broker_host: Optional[str] = typer.Option(None, "--mqtt-host", help="MQTT broker hostname"), mqtt_broker_host: Optional[str] = typer.Option(None, "--mqtt-host", help="MQTT broker hostname"),
mqtt_broker_port: int = typer.Option(1883, "--mqtt-port", help="MQTT broker port"), mqtt_broker_port: int = typer.Option(1883, "--mqtt-port", help="MQTT broker port"),
mqtt_client_id: Optional[str] = typer.Option(None, "--mqtt-client-id", help="MQTT client ID"), mqtt_client_id: Optional[str] = typer.Option(None, "--mqtt-client-id", help="MQTT client ID"),
@ -98,15 +60,7 @@ def serve(
mqtt_password: Optional[str] = typer.Option(None, "--mqtt-password", help="MQTT password"), mqtt_password: Optional[str] = typer.Option(None, "--mqtt-password", help="MQTT password"),
auto_connect: bool = typer.Option(False, "--auto-connect", help="Automatically connect to MQTT broker on startup") auto_connect: bool = typer.Option(False, "--auto-connect", help="Automatically connect to MQTT broker on startup")
): ):
"""Start the mcmqtt FastMCP server.""" """mcmqtt FastMCP MQTT Server - Enabling MQTT integration for MCP clients."""
# Setup logging
setup_logging(log_level)
logger = structlog.get_logger()
# Display startup banner
version = get_version()
console.print(f"[bold blue]🎬 mcmqtt FastMCP MQTT Server v{version}[/bold blue]")
console.print(f"[dim]Starting server on {host}:{port}[/dim]")
# Create MQTT configuration # Create MQTT configuration
mqtt_config = None mqtt_config = None
@ -120,114 +74,46 @@ def serve(
username=mqtt_username, username=mqtt_username,
password=mqtt_password password=mqtt_password
) )
console.print(f"[green]MQTT Configuration: {mqtt_broker_host}:{mqtt_broker_port}[/green]")
else: else:
# Try environment variables # Try environment variables
mqtt_config = create_mqtt_config_from_env() mqtt_config = create_mqtt_config_from_env()
if mqtt_config:
console.print(f"[green]MQTT Configuration (from env): {mqtt_config.broker_host}:{mqtt_config.broker_port}[/green]")
else:
console.print("[yellow]No MQTT configuration provided. Use tools to configure at runtime.[/yellow]")
# Create and configure server # Create and configure server
server = MCMQTTServer(mqtt_config) server = MCMQTTServer(mqtt_config)
async def run_server(): # Handle MQTT auto-connect if needed
"""Run the server with auto-connect if enabled."""
try:
if auto_connect and mqtt_config: if auto_connect and mqtt_config:
console.print("[blue]Auto-connecting to MQTT broker...[/blue]") async def connect_mqtt():
success = await server.initialize_mqtt_client(mqtt_config) success = await server.initialize_mqtt_client(mqtt_config)
if success: if success:
await server.connect_mqtt() await server.connect_mqtt()
console.print("[green]Connected to MQTT broker[/green]")
else:
console.print("[red]Failed to connect to MQTT broker[/red]")
# Start FastMCP server try:
await server.run_server(host, port) asyncio.run(connect_mqtt())
except Exception as e:
except KeyboardInterrupt: console.print(f"[red]MQTT connection failed: {e}[/red]")
console.print("\n[yellow]Shutting down server...[/yellow]")
await server.disconnect_mqtt() # Start FastMCP server based on transport
try:
if transport.lower() == "http":
# HTTP mode uses async
async def run_http():
await server.run_server(host, port)
asyncio.run(run_http())
else:
# STDIO mode is synchronous and handles its own event loop
server.run_stdio()
except KeyboardInterrupt:
pass
except Exception as e: except Exception as e:
logger.error("Server error", error=str(e))
console.print(f"[red]Server error: {e}[/red]") console.print(f"[red]Server error: {e}[/red]")
sys.exit(1) sys.exit(1)
# Run the server
try:
asyncio.run(run_server())
except KeyboardInterrupt:
console.print("\n[yellow]Server stopped[/yellow]")
@app.command()
def version():
"""Show version information."""
version_str = get_version()
console.print(f"mcmqtt version: [bold blue]{version_str}[/bold blue]")
@app.command()
def health(
host: str = typer.Option("localhost", "--host", "-h", help="Server host"),
port: int = typer.Option(3000, "--port", "-p", help="Server port")
):
"""Check server health."""
import httpx
try:
url = f"http://{host}:{port}/health"
response = httpx.get(url, timeout=10.0)
if response.status_code == 200:
console.print("[green]✅ Server is healthy[/green]")
console.print(response.json())
else:
console.print(f"[red]❌ Server unhealthy (status: {response.status_code})[/red]")
sys.exit(1)
except httpx.ConnectError:
console.print(f"[red]❌ Cannot connect to server at {host}:{port}[/red]")
sys.exit(1)
except Exception as e:
console.print(f"[red]❌ Health check failed: {e}[/red]")
sys.exit(1)
@app.command()
def config():
"""Show current configuration."""
setup_logging()
console.print("[bold blue]Configuration Sources:[/bold blue]")
# Environment variables
console.print("\n[bold]Environment Variables:[/bold]")
env_vars = [
"MQTT_BROKER_HOST", "MQTT_BROKER_PORT", "MQTT_CLIENT_ID",
"MQTT_USERNAME", "MQTT_KEEPALIVE", "MQTT_QOS", "MQTT_USE_TLS",
"MCP_SERVER_PORT", "LOG_LEVEL"
]
for var in env_vars:
value = os.getenv(var, "[dim]not set[/dim]")
if "PASSWORD" in var and value != "[dim]not set[/dim]":
value = "[dim]***[/dim]"
console.print(f" {var}: {value}")
# MQTT config from environment
mqtt_config = create_mqtt_config_from_env()
if mqtt_config:
console.print("\n[bold green]MQTT Configuration (parsed):[/bold green]")
console.print(f" Broker: {mqtt_config.broker_host}:{mqtt_config.broker_port}")
console.print(f" Client ID: {mqtt_config.client_id}")
console.print(f" QoS: {mqtt_config.qos.value}")
console.print(f" TLS: {mqtt_config.use_tls}")
else:
console.print("\n[yellow]No valid MQTT configuration found in environment[/yellow]")
def main(): def main():
"""Main entry point.""" """Main entry point following FastMCP conventions."""
app() typer.run(main_server)
if __name__ == "__main__": if __name__ == "__main__":
main() main()

View File

@ -748,6 +748,16 @@ class MCMQTTServer(MCPMixin):
logger.error(f"Server error: {e}") logger.error(f"Server error: {e}")
raise raise
def run_stdio(self):
"""Run the FastMCP server with STDIO transport (default for MCP clients)."""
try:
# FastMCP's run() method is synchronous and handles its own event loop
self.mcp.run()
except Exception as e:
logger.error(f"STDIO server error: {e}")
raise
def get_mcp_server(self) -> FastMCP: def get_mcp_server(self) -> FastMCP:
"""Get the FastMCP server instance.""" """Get the FastMCP server instance."""
return self.mcp return self.mcp