forked from rsp2k/mcp-mailu
- Implement prepare_dns_automation tool that generates comprehensive DNS records - Tool creates structured DNS plan with CRITICAL, HIGH, MEDIUM, LOW priority records - Generates completion request for LLM to use its available DNS management MCP tools - Automatically generates DKIM keys if missing - Includes MX, A, SPF, DMARC, DKIM, autoconfig, autodiscover, and SRV records - Provides step-by-step automation instructions and verification commands - Version bump to 0.5.0 for major DNS automation feature This creates powerful orchestration where Mailu MCP generates the records and instructs the LLM to use other MCP tools (Cloudflare, Route53, etc.) to actually configure DNS - a brilliant multi-tool workflow\! Tool usage: prepare_dns_automation(domain="example.com", mail_server_ip="1.2.3.4") 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
1098 lines
50 KiB
Python
1098 lines
50 KiB
Python
"""FastMCP server for Mailu integration using manual tools and resources."""
|
|
|
|
import json
|
|
import logging
|
|
import os
|
|
from typing import Optional, Union
|
|
|
|
import httpx
|
|
from dotenv import load_dotenv
|
|
from fastmcp import FastMCP, Context
|
|
|
|
# Load environment variables from .env file
|
|
load_dotenv()
|
|
|
|
# Configure logging
|
|
logging.basicConfig(level=logging.INFO)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def create_mailu_client(base_url: str, api_token: str) -> httpx.AsyncClient:
|
|
"""Create an authenticated HTTP client for Mailu API."""
|
|
headers = {
|
|
"Authorization": f"Bearer {api_token}",
|
|
"Content-Type": "application/json",
|
|
"Accept": "application/json"
|
|
}
|
|
|
|
return httpx.AsyncClient(
|
|
base_url=base_url,
|
|
headers=headers,
|
|
timeout=30.0
|
|
)
|
|
|
|
|
|
def get_mailu_client() -> httpx.AsyncClient:
|
|
"""Get a new HTTP client for Mailu API."""
|
|
mailu_base_url = os.getenv("MAILU_BASE_URL", "https://mail.example.com")
|
|
mailu_api_token = os.getenv("MAILU_API_TOKEN")
|
|
|
|
if not mailu_api_token:
|
|
raise ValueError("MAILU_API_TOKEN environment variable not set")
|
|
|
|
return create_mailu_client(mailu_base_url + "/api/v1", mailu_api_token)
|
|
|
|
|
|
def create_mcp_server() -> FastMCP:
|
|
"""Create the MCP server with Mailu API integration using manual tools."""
|
|
|
|
# Get configuration from environment variables
|
|
mailu_base_url = os.getenv("MAILU_BASE_URL", "https://mail.example.com")
|
|
mailu_api_token = os.getenv("MAILU_API_TOKEN", "")
|
|
|
|
if not mailu_api_token:
|
|
logger.warning("MAILU_API_TOKEN environment variable not set. Server will not work without authentication.")
|
|
|
|
logger.info(f"Creating MCP server for Mailu API at {mailu_base_url}")
|
|
|
|
# Create a comprehensive MCP server with manual tools
|
|
mcp = FastMCP("Mailu MCP Server")
|
|
|
|
# ===== RESOURCES (READ-ONLY DATA) =====
|
|
|
|
# List resources - no parameters
|
|
@mcp.resource("mailu://users")
|
|
async def users_resource(ctx: Context) -> list:
|
|
"""List all users in the Mailu instance."""
|
|
try:
|
|
async with get_mailu_client() as mailu_client:
|
|
response = await mailu_client.get("/user")
|
|
response.raise_for_status()
|
|
return response.json()
|
|
except Exception as e:
|
|
return [{"error": str(e)}]
|
|
|
|
@mcp.resource("mailu://domains")
|
|
async def domains_resource(ctx: Context) -> list:
|
|
"""List all domains in the Mailu instance."""
|
|
try:
|
|
async with get_mailu_client() as mailu_client:
|
|
response = await mailu_client.get("/domain")
|
|
response.raise_for_status()
|
|
return response.json()
|
|
except Exception as e:
|
|
return [{"error": str(e)}]
|
|
|
|
@mcp.resource("mailu://aliases")
|
|
async def aliases_resource(ctx: Context) -> list:
|
|
"""List all aliases in the Mailu instance."""
|
|
try:
|
|
async with get_mailu_client() as mailu_client:
|
|
response = await mailu_client.get("/alias")
|
|
response.raise_for_status()
|
|
return response.json()
|
|
except Exception as e:
|
|
return [{"error": str(e)}]
|
|
|
|
@mcp.resource("mailu://alternatives")
|
|
async def alternatives_resource(ctx: Context) -> list:
|
|
"""List all alternative domains in the Mailu instance."""
|
|
try:
|
|
async with get_mailu_client() as mailu_client:
|
|
response = await mailu_client.get("/alternative")
|
|
response.raise_for_status()
|
|
return response.json()
|
|
except Exception as e:
|
|
return [{"error": str(e)}]
|
|
|
|
@mcp.resource("mailu://relays")
|
|
async def relays_resource(ctx: Context) -> list:
|
|
"""List all relays in the Mailu instance."""
|
|
try:
|
|
async with get_mailu_client() as mailu_client:
|
|
response = await mailu_client.get("/relay")
|
|
response.raise_for_status()
|
|
return response.json()
|
|
except Exception as e:
|
|
return [{"error": str(e)}]
|
|
|
|
# Detail resources with parameters
|
|
@mcp.resource("mailu://user/{email}")
|
|
async def user_resource(ctx: Context, email: str) -> dict:
|
|
"""Get details of a specific user."""
|
|
try:
|
|
async with get_mailu_client() as mailu_client:
|
|
response = await mailu_client.get(f"/user/{email}")
|
|
response.raise_for_status()
|
|
return response.json()
|
|
except Exception as e:
|
|
return {"error": str(e)}
|
|
|
|
@mcp.resource("mailu://domain/{domain}")
|
|
async def domain_resource(ctx: Context, domain: str) -> dict:
|
|
"""Get details of a specific domain."""
|
|
try:
|
|
async with get_mailu_client() as mailu_client:
|
|
response = await mailu_client.get(f"/domain/{domain}")
|
|
response.raise_for_status()
|
|
return response.json()
|
|
except Exception as e:
|
|
return {"error": str(e)}
|
|
|
|
@mcp.resource("mailu://alias/{alias}")
|
|
async def alias_resource(ctx: Context, alias: str) -> dict:
|
|
"""Get details of a specific alias."""
|
|
try:
|
|
async with get_mailu_client() as mailu_client:
|
|
response = await mailu_client.get(f"/alias/{alias}")
|
|
response.raise_for_status()
|
|
return response.json()
|
|
except Exception as e:
|
|
return {"error": str(e)}
|
|
|
|
@mcp.resource("mailu://alternative/{alt}")
|
|
async def alternative_resource(ctx: Context, alt: str) -> dict:
|
|
"""Get details of a specific alternative domain."""
|
|
try:
|
|
async with get_mailu_client() as mailu_client:
|
|
response = await mailu_client.get(f"/alternative/{alt}")
|
|
response.raise_for_status()
|
|
return response.json()
|
|
except Exception as e:
|
|
return {"error": str(e)}
|
|
|
|
@mcp.resource("mailu://relay/{name}")
|
|
async def relay_resource(ctx: Context, name: str) -> dict:
|
|
"""Get details of a specific relay."""
|
|
try:
|
|
async with get_mailu_client() as mailu_client:
|
|
response = await mailu_client.get(f"/relay/{name}")
|
|
response.raise_for_status()
|
|
return response.json()
|
|
except Exception as e:
|
|
return {"error": str(e)}
|
|
|
|
@mcp.resource("mailu://domain/{domain}/users")
|
|
async def domain_users_resource(ctx: Context, domain: str) -> list:
|
|
"""List all users in a specific domain."""
|
|
try:
|
|
async with get_mailu_client() as mailu_client:
|
|
response = await mailu_client.get(f"/domain/{domain}/users")
|
|
response.raise_for_status()
|
|
return response.json()
|
|
except Exception as e:
|
|
return [{"error": str(e)}]
|
|
|
|
@mcp.resource("mailu://domain/{domain}/managers")
|
|
async def domain_managers_resource(ctx: Context, domain: str) -> list:
|
|
"""List all managers for a specific domain."""
|
|
try:
|
|
async with get_mailu_client() as mailu_client:
|
|
response = await mailu_client.get(f"/domain/{domain}/manager")
|
|
response.raise_for_status()
|
|
return response.json()
|
|
except Exception as e:
|
|
return [{"error": str(e)}]
|
|
|
|
@mcp.resource("mailu://domain/{domain}/manager/{email}")
|
|
async def domain_manager_resource(ctx: Context, domain: str, email: str) -> dict:
|
|
"""Get details of a specific domain manager."""
|
|
try:
|
|
async with get_mailu_client() as mailu_client:
|
|
response = await mailu_client.get(f"/domain/{domain}/manager/{email}")
|
|
response.raise_for_status()
|
|
return response.json()
|
|
except Exception as e:
|
|
return {"error": str(e)}
|
|
|
|
@mcp.resource("mailu://aliases/destination/{domain}")
|
|
async def aliases_by_destination_resource(ctx: Context, domain: str) -> list:
|
|
"""Find aliases by destination domain."""
|
|
try:
|
|
async with get_mailu_client() as mailu_client:
|
|
response = await mailu_client.get(f"/alias/destination/{domain}")
|
|
response.raise_for_status()
|
|
return response.json()
|
|
except Exception as e:
|
|
return [{"error": str(e)}]
|
|
|
|
# ===== TOOLS (ACTIONS) =====
|
|
|
|
# List tools for Claude Desktop
|
|
@mcp.tool()
|
|
async def list_users() -> str:
|
|
"""List all users in the Mailu instance."""
|
|
try:
|
|
async with get_mailu_client() as mailu_client:
|
|
response = await mailu_client.get("/user")
|
|
response.raise_for_status()
|
|
users = response.json()
|
|
return f"Found {len(users)} users: {json.dumps(users, indent=2)}"
|
|
except Exception as e:
|
|
return f"Error listing users: {e}"
|
|
|
|
@mcp.tool()
|
|
async def list_domains() -> str:
|
|
"""List all domains in the Mailu instance."""
|
|
try:
|
|
async with get_mailu_client() as mailu_client:
|
|
response = await mailu_client.get("/domain")
|
|
response.raise_for_status()
|
|
domains = response.json()
|
|
return f"Found {len(domains)} domains: {json.dumps(domains, indent=2)}"
|
|
except Exception as e:
|
|
return f"Error listing domains: {e}"
|
|
|
|
@mcp.tool()
|
|
async def list_aliases() -> str:
|
|
"""List all aliases in the Mailu instance."""
|
|
try:
|
|
async with get_mailu_client() as mailu_client:
|
|
response = await mailu_client.get("/alias")
|
|
response.raise_for_status()
|
|
aliases = response.json()
|
|
return f"Found {len(aliases)} aliases: {json.dumps(aliases, indent=2)}"
|
|
except Exception as e:
|
|
return f"Error listing aliases: {e}"
|
|
|
|
@mcp.tool()
|
|
async def list_alternative_domains() -> str:
|
|
"""List all alternative domains in the Mailu instance."""
|
|
try:
|
|
async with get_mailu_client() as mailu_client:
|
|
response = await mailu_client.get("/alternative")
|
|
response.raise_for_status()
|
|
alternatives = response.json()
|
|
return f"Found {len(alternatives)} alternative domains: {json.dumps(alternatives, indent=2)}"
|
|
except Exception as e:
|
|
return f"Error listing alternative domains: {e}"
|
|
|
|
@mcp.tool()
|
|
async def list_relays() -> str:
|
|
"""List all relays in the Mailu instance."""
|
|
try:
|
|
async with get_mailu_client() as mailu_client:
|
|
response = await mailu_client.get("/relay")
|
|
response.raise_for_status()
|
|
relays = response.json()
|
|
return f"Found {len(relays)} relays: {json.dumps(relays, indent=2)}"
|
|
except Exception as e:
|
|
return f"Error listing relays: {e}"
|
|
|
|
@mcp.tool()
|
|
async def get_user(email: str) -> str:
|
|
"""Get details of a specific user."""
|
|
try:
|
|
async with get_mailu_client() as mailu_client:
|
|
response = await mailu_client.get(f"/user/{email}")
|
|
response.raise_for_status()
|
|
user = response.json()
|
|
return f"User details for {email}: {json.dumps(user, indent=2)}"
|
|
except Exception as e:
|
|
return f"Error getting user {email}: {e}"
|
|
|
|
@mcp.tool()
|
|
async def get_domain(domain: str) -> str:
|
|
"""Get details of a specific domain."""
|
|
try:
|
|
async with get_mailu_client() as mailu_client:
|
|
response = await mailu_client.get(f"/domain/{domain}")
|
|
response.raise_for_status()
|
|
domain_data = response.json()
|
|
return f"Domain details for {domain}: {json.dumps(domain_data, indent=2)}"
|
|
except Exception as e:
|
|
return f"Error getting domain {domain}: {e}"
|
|
|
|
@mcp.tool()
|
|
async def get_alias(alias: str) -> str:
|
|
"""Get details of a specific alias."""
|
|
try:
|
|
async with get_mailu_client() as mailu_client:
|
|
response = await mailu_client.get(f"/alias/{alias}")
|
|
response.raise_for_status()
|
|
alias_data = response.json()
|
|
return f"Alias details for {alias}: {json.dumps(alias_data, indent=2)}"
|
|
except Exception as e:
|
|
return f"Error getting alias {alias}: {e}"
|
|
|
|
@mcp.tool()
|
|
async def list_domain_users(domain: str) -> str:
|
|
"""List all users in a specific domain."""
|
|
try:
|
|
async with get_mailu_client() as mailu_client:
|
|
response = await mailu_client.get(f"/domain/{domain}/users")
|
|
response.raise_for_status()
|
|
users = response.json()
|
|
return f"Found {len(users)} users in domain {domain}: {json.dumps(users, indent=2)}"
|
|
except Exception as e:
|
|
return f"Error listing users for domain {domain}: {e}"
|
|
|
|
@mcp.tool()
|
|
async def list_domain_managers(domain: str) -> str:
|
|
"""List all managers for a specific domain."""
|
|
try:
|
|
async with get_mailu_client() as mailu_client:
|
|
response = await mailu_client.get(f"/domain/{domain}/manager")
|
|
response.raise_for_status()
|
|
managers = response.json()
|
|
return f"Found {len(managers)} managers for domain {domain}: {json.dumps(managers, indent=2)}"
|
|
except Exception as e:
|
|
return f"Error listing managers for domain {domain}: {e}"
|
|
|
|
# User management tools
|
|
@mcp.tool()
|
|
async def create_user(email: str, raw_password: str, comment: str = "", quota_bytes: int = 0,
|
|
global_admin: bool = False, enabled: bool = True, enable_imap: bool = True,
|
|
enable_pop: bool = True, allow_spoofing: bool = False, forward_enabled: bool = False,
|
|
forward_destination: str = "", forward_keep: bool = True, reply_enabled: bool = False,
|
|
reply_subject: str = "", reply_body: str = "", displayed_name: str = "",
|
|
spam_enabled: bool = True, spam_mark_as_read: bool = False, spam_threshold: int = 80) -> str:
|
|
"""Create a new user in the Mailu instance."""
|
|
try:
|
|
async with get_mailu_client() as mailu_client:
|
|
user_data = {
|
|
"email": email,
|
|
"raw_password": raw_password,
|
|
"comment": comment,
|
|
"quota_bytes": quota_bytes,
|
|
"global_admin": global_admin,
|
|
"enabled": enabled,
|
|
"enable_imap": enable_imap,
|
|
"enable_pop": enable_pop,
|
|
"allow_spoofing": allow_spoofing,
|
|
"forward_enabled": forward_enabled,
|
|
"forward_destination": forward_destination,
|
|
"forward_keep": forward_keep,
|
|
"reply_enabled": reply_enabled,
|
|
"reply_subject": reply_subject,
|
|
"reply_body": reply_body,
|
|
"displayed_name": displayed_name,
|
|
"spam_enabled": spam_enabled,
|
|
"spam_mark_as_read": spam_mark_as_read,
|
|
"spam_threshold": spam_threshold
|
|
}
|
|
response = await mailu_client.post("/user", json=user_data)
|
|
response.raise_for_status()
|
|
return f"Create user result: {response.json()}"
|
|
except Exception as e:
|
|
return f"Error creating user: {e}"
|
|
|
|
@mcp.tool()
|
|
async def update_user(email: str, raw_password: str = "", comment: str = "", quota_bytes: Optional[int] = None,
|
|
global_admin: Optional[bool] = None, enabled: Optional[bool] = None,
|
|
enable_imap: Optional[bool] = None, enable_pop: Optional[bool] = None,
|
|
allow_spoofing: Optional[bool] = None, forward_enabled: Optional[bool] = None,
|
|
forward_destination: str = "", forward_keep: Optional[bool] = None,
|
|
reply_enabled: Optional[bool] = None, reply_subject: str = "", reply_body: str = "",
|
|
displayed_name: str = "", spam_enabled: Optional[bool] = None,
|
|
spam_mark_as_read: Optional[bool] = None, spam_threshold: Optional[int] = None) -> str:
|
|
"""Update an existing user."""
|
|
try:
|
|
async with get_mailu_client() as mailu_client:
|
|
user_data = {}
|
|
if raw_password: user_data["raw_password"] = raw_password
|
|
if comment: user_data["comment"] = comment
|
|
if quota_bytes is not None: user_data["quota_bytes"] = quota_bytes
|
|
if global_admin is not None: user_data["global_admin"] = global_admin
|
|
if enabled is not None: user_data["enabled"] = enabled
|
|
if enable_imap is not None: user_data["enable_imap"] = enable_imap
|
|
if enable_pop is not None: user_data["enable_pop"] = enable_pop
|
|
if allow_spoofing is not None: user_data["allow_spoofing"] = allow_spoofing
|
|
if forward_enabled is not None: user_data["forward_enabled"] = forward_enabled
|
|
if forward_destination: user_data["forward_destination"] = forward_destination
|
|
if forward_keep is not None: user_data["forward_keep"] = forward_keep
|
|
if reply_enabled is not None: user_data["reply_enabled"] = reply_enabled
|
|
if reply_subject: user_data["reply_subject"] = reply_subject
|
|
if reply_body: user_data["reply_body"] = reply_body
|
|
if displayed_name: user_data["displayed_name"] = displayed_name
|
|
if spam_enabled is not None: user_data["spam_enabled"] = spam_enabled
|
|
if spam_mark_as_read is not None: user_data["spam_mark_as_read"] = spam_mark_as_read
|
|
if spam_threshold is not None: user_data["spam_threshold"] = spam_threshold
|
|
|
|
response = await mailu_client.patch(f"/user/{email}", json=user_data)
|
|
response.raise_for_status()
|
|
return f"Update user result: {response.json()}"
|
|
except Exception as e:
|
|
return f"Error updating user: {e}"
|
|
|
|
@mcp.tool()
|
|
async def delete_user(email: str) -> str:
|
|
"""Delete a user from the Mailu instance."""
|
|
try:
|
|
async with get_mailu_client() as mailu_client:
|
|
response = await mailu_client.delete(f"/user/{email}")
|
|
response.raise_for_status()
|
|
return f"Delete user result: {response.json()}"
|
|
except Exception as e:
|
|
return f"Error deleting user: {e}"
|
|
|
|
# Domain management tools
|
|
@mcp.tool()
|
|
async def create_domain(name: str, comment: str = "", max_users: int = -1, max_aliases: int = -1,
|
|
max_quota_bytes: int = 0, signup_enabled: bool = False, alternatives: str = "") -> str:
|
|
"""Create a new domain in the Mailu instance."""
|
|
try:
|
|
async with get_mailu_client() as mailu_client:
|
|
domain_data = {
|
|
"name": name,
|
|
"comment": comment,
|
|
"max_users": max_users,
|
|
"max_aliases": max_aliases,
|
|
"max_quota_bytes": max_quota_bytes,
|
|
"signup_enabled": signup_enabled
|
|
}
|
|
if alternatives:
|
|
domain_data["alternatives"] = alternatives.split(",")
|
|
|
|
response = await mailu_client.post("/domain", json=domain_data)
|
|
response.raise_for_status()
|
|
return f"Create domain result: {response.json()}"
|
|
except Exception as e:
|
|
return f"Error creating domain: {e}"
|
|
|
|
@mcp.tool()
|
|
async def update_domain(domain: str, comment: str = "", max_users: Optional[int] = None,
|
|
max_aliases: Optional[int] = None, max_quota_bytes: Optional[int] = None,
|
|
signup_enabled: Optional[bool] = None, alternatives: str = "") -> str:
|
|
"""Update an existing domain."""
|
|
try:
|
|
async with get_mailu_client() as mailu_client:
|
|
domain_data = {}
|
|
if comment: domain_data["comment"] = comment
|
|
if max_users is not None: domain_data["max_users"] = max_users
|
|
if max_aliases is not None: domain_data["max_aliases"] = max_aliases
|
|
if max_quota_bytes is not None: domain_data["max_quota_bytes"] = max_quota_bytes
|
|
if signup_enabled is not None: domain_data["signup_enabled"] = signup_enabled
|
|
if alternatives: domain_data["alternatives"] = alternatives.split(",")
|
|
|
|
response = await mailu_client.patch(f"/domain/{domain}", json=domain_data)
|
|
response.raise_for_status()
|
|
return f"Update domain result: {response.json()}"
|
|
except Exception as e:
|
|
return f"Error updating domain: {e}"
|
|
|
|
@mcp.tool()
|
|
async def delete_domain(domain: str) -> str:
|
|
"""Delete a domain from the Mailu instance."""
|
|
try:
|
|
async with get_mailu_client() as mailu_client:
|
|
response = await mailu_client.delete(f"/domain/{domain}")
|
|
response.raise_for_status()
|
|
return f"Delete domain result: {response.json()}"
|
|
except Exception as e:
|
|
return f"Error deleting domain: {e}"
|
|
|
|
@mcp.tool()
|
|
async def generate_dkim_keys(domain: str) -> str:
|
|
"""Generate DKIM keys for a domain."""
|
|
try:
|
|
async with get_mailu_client() as mailu_client:
|
|
response = await mailu_client.post(f"/domain/{domain}/dkim")
|
|
response.raise_for_status()
|
|
return f"Generate DKIM result: {response.json()}"
|
|
except Exception as e:
|
|
return f"Error generating DKIM keys: {e}"
|
|
|
|
# Domain manager tools
|
|
@mcp.tool()
|
|
async def create_domain_manager(domain: str, user_email: str) -> str:
|
|
"""Create a new domain manager."""
|
|
try:
|
|
async with get_mailu_client() as mailu_client:
|
|
manager_data = {"user_email": user_email}
|
|
response = await mailu_client.post(f"/domain/{domain}/manager", json=manager_data)
|
|
response.raise_for_status()
|
|
return f"Create domain manager result: {response.json()}"
|
|
except Exception as e:
|
|
return f"Error creating domain manager: {e}"
|
|
|
|
@mcp.tool()
|
|
async def delete_domain_manager(domain: str, email: str) -> str:
|
|
"""Delete a domain manager."""
|
|
try:
|
|
async with get_mailu_client() as mailu_client:
|
|
response = await mailu_client.delete(f"/domain/{domain}/manager/{email}")
|
|
response.raise_for_status()
|
|
return f"Delete domain manager result: {response.json()}"
|
|
except Exception as e:
|
|
return f"Error deleting domain manager: {e}"
|
|
|
|
# Alias management tools
|
|
@mcp.tool()
|
|
async def create_alias(email: str, destination: str, comment: str = "", wildcard: bool = False) -> str:
|
|
"""Create a new alias."""
|
|
try:
|
|
async with get_mailu_client() as mailu_client:
|
|
alias_data = {
|
|
"email": email,
|
|
"destination": destination,
|
|
"comment": comment,
|
|
"wildcard": wildcard
|
|
}
|
|
response = await mailu_client.post("/alias", json=alias_data)
|
|
response.raise_for_status()
|
|
return f"Create alias result: {response.json()}"
|
|
except Exception as e:
|
|
return f"Error creating alias: {e}"
|
|
|
|
@mcp.tool()
|
|
async def update_alias(alias: str, destination: str = "", comment: str = "", wildcard: Optional[bool] = None) -> str:
|
|
"""Update an existing alias."""
|
|
try:
|
|
async with get_mailu_client() as mailu_client:
|
|
alias_data = {}
|
|
if destination: alias_data["destination"] = destination
|
|
if comment: alias_data["comment"] = comment
|
|
if wildcard is not None: alias_data["wildcard"] = wildcard
|
|
|
|
response = await mailu_client.patch(f"/alias/{alias}", json=alias_data)
|
|
response.raise_for_status()
|
|
return f"Update alias result: {response.json()}"
|
|
except Exception as e:
|
|
return f"Error updating alias: {e}"
|
|
|
|
@mcp.tool()
|
|
async def delete_alias(alias: str) -> str:
|
|
"""Delete an alias."""
|
|
try:
|
|
async with get_mailu_client() as mailu_client:
|
|
response = await mailu_client.delete(f"/alias/{alias}")
|
|
response.raise_for_status()
|
|
return f"Delete alias result: {response.json()}"
|
|
except Exception as e:
|
|
return f"Error deleting alias: {e}"
|
|
|
|
# Alternative domain tools
|
|
@mcp.tool()
|
|
async def create_alternative_domain(name: str, domain: str) -> str:
|
|
"""Create a new alternative domain."""
|
|
try:
|
|
async with get_mailu_client() as mailu_client:
|
|
alt_data = {"name": name, "domain": domain}
|
|
response = await mailu_client.post("/alternative", json=alt_data)
|
|
response.raise_for_status()
|
|
return f"Create alternative domain result: {response.json()}"
|
|
except Exception as e:
|
|
return f"Error creating alternative domain: {e}"
|
|
|
|
@mcp.tool()
|
|
async def delete_alternative_domain(alt: str) -> str:
|
|
"""Delete an alternative domain."""
|
|
try:
|
|
async with get_mailu_client() as mailu_client:
|
|
response = await mailu_client.delete(f"/alternative/{alt}")
|
|
response.raise_for_status()
|
|
return f"Delete alternative domain result: {response.json()}"
|
|
except Exception as e:
|
|
return f"Error deleting alternative domain: {e}"
|
|
|
|
# Relay management tools
|
|
@mcp.tool()
|
|
async def create_relay(name: str, smtp: str, comment: str = "") -> str:
|
|
"""Create a new relay."""
|
|
try:
|
|
async with get_mailu_client() as mailu_client:
|
|
relay_data = {"name": name, "smtp": smtp, "comment": comment}
|
|
response = await mailu_client.post("/relay", json=relay_data)
|
|
response.raise_for_status()
|
|
return f"Create relay result: {response.json()}"
|
|
except Exception as e:
|
|
return f"Error creating relay: {e}"
|
|
|
|
@mcp.tool()
|
|
async def update_relay(name: str, smtp: str = "", comment: str = "") -> str:
|
|
"""Update an existing relay."""
|
|
try:
|
|
async with get_mailu_client() as mailu_client:
|
|
relay_data = {}
|
|
if smtp: relay_data["smtp"] = smtp
|
|
if comment: relay_data["comment"] = comment
|
|
|
|
response = await mailu_client.patch(f"/relay/{name}", json=relay_data)
|
|
response.raise_for_status()
|
|
return f"Update relay result: {response.json()}"
|
|
except Exception as e:
|
|
return f"Error updating relay: {e}"
|
|
|
|
@mcp.tool()
|
|
async def delete_relay(name: str) -> str:
|
|
"""Delete a relay."""
|
|
try:
|
|
async with get_mailu_client() as mailu_client:
|
|
response = await mailu_client.delete(f"/relay/{name}")
|
|
response.raise_for_status()
|
|
return f"Delete relay result: {response.json()}"
|
|
except Exception as e:
|
|
return f"Error deleting relay: {e}"
|
|
|
|
# Advanced security automation tools
|
|
@mcp.tool()
|
|
async def auto_configure_domain_security(domain: str) -> str:
|
|
"""Auto-configure complete domain security: DKIM, SPF, DMARC with DNS records."""
|
|
try:
|
|
async with get_mailu_client() as mailu_client:
|
|
# Step 1: Generate DKIM keys
|
|
dkim_response = await mailu_client.post(f"/domain/{domain}/dkim")
|
|
dkim_response.raise_for_status()
|
|
|
|
# Step 2: Get domain info with DNS records
|
|
domain_response = await mailu_client.get(f"/domain/{domain}")
|
|
domain_response.raise_for_status()
|
|
domain_data = domain_response.json()
|
|
|
|
# Extract DNS records
|
|
dns_records = []
|
|
dkim_public_key = domain_data.get("dkim_public_key", "")
|
|
if dkim_public_key:
|
|
dns_records.append({
|
|
"type": "TXT",
|
|
"name": f"dkim._domainkey.{domain}",
|
|
"value": dkim_public_key,
|
|
"description": "DKIM public key for email authentication"
|
|
})
|
|
|
|
# SPF record
|
|
spf_record = f"v=spf1 mx include:{domain} ~all"
|
|
dns_records.append({
|
|
"type": "TXT",
|
|
"name": domain,
|
|
"value": spf_record,
|
|
"description": "SPF record for sender policy framework"
|
|
})
|
|
|
|
# DMARC record
|
|
dmarc_record = f"v=DMARC1; p=quarantine; rua=mailto:dmarc@{domain}; ruf=mailto:dmarc@{domain}; fo=1"
|
|
dns_records.append({
|
|
"type": "TXT",
|
|
"name": f"_dmarc.{domain}",
|
|
"value": dmarc_record,
|
|
"description": "DMARC policy for email authentication"
|
|
})
|
|
|
|
# MX record
|
|
dns_records.append({
|
|
"type": "MX",
|
|
"name": domain,
|
|
"value": f"10 {domain}",
|
|
"description": "Mail exchange record"
|
|
})
|
|
|
|
# A record (assuming mail server is on same domain)
|
|
dns_records.append({
|
|
"type": "A",
|
|
"name": domain,
|
|
"value": "YOUR_SERVER_IP",
|
|
"description": "IPv4 address record for mail server"
|
|
})
|
|
|
|
# TLSA record for DANE
|
|
dns_records.append({
|
|
"type": "TLSA",
|
|
"name": f"_25._tcp.{domain}",
|
|
"value": "3 1 1 CERTIFICATE_HASH",
|
|
"description": "DANE TLSA record for secure mail transport"
|
|
})
|
|
|
|
# Autoconfig records for email clients
|
|
dns_records.extend([
|
|
{
|
|
"type": "CNAME",
|
|
"name": f"autoconfig.{domain}",
|
|
"value": domain,
|
|
"description": "Email client auto-configuration"
|
|
},
|
|
{
|
|
"type": "CNAME",
|
|
"name": f"autodiscover.{domain}",
|
|
"value": domain,
|
|
"description": "Email client auto-discovery"
|
|
}
|
|
])
|
|
|
|
# Security recommendations
|
|
recommendations = [
|
|
"🔐 DKIM keys have been generated and configured",
|
|
"📧 SPF record configured to authorize your mail server",
|
|
"🛡️ DMARC policy set to quarantine suspicious emails",
|
|
"📍 MX record configured for mail delivery",
|
|
"🔒 TLSA record added for DANE (update certificate hash)",
|
|
"⚙️ Autoconfig records added for easier client setup"
|
|
]
|
|
|
|
if dkim_public_key:
|
|
recommendations.append("✅ DKIM public key is available for DNS configuration")
|
|
else:
|
|
recommendations.append("⚠️ DKIM public key not found - please regenerate DKIM keys")
|
|
|
|
if "alternatives" in domain_data:
|
|
recommendations.append("🔄 Alternative domains configured")
|
|
else:
|
|
recommendations.append("💡 Consider adding alternative domains for better deliverability")
|
|
|
|
if domain_data.get("signup_enabled", False):
|
|
recommendations.append("✅ Domain signup is enabled")
|
|
else:
|
|
recommendations.append("💡 Domain signup is disabled - enable if needed")
|
|
|
|
if domain_data.get("max_users", -1) > 0:
|
|
recommendations.append(f"👥 User limit set to {domain_data['max_users']}")
|
|
else:
|
|
recommendations.append("👥 No user limit set")
|
|
|
|
if domain_data.get("max_aliases", -1) > 0:
|
|
recommendations.append(f"📫 Alias limit set to {domain_data['max_aliases']}")
|
|
else:
|
|
recommendations.append("📫 No alias limit set")
|
|
|
|
# Check for autoconfig availability
|
|
if any(rec["name"].startswith("autoconfig") for rec in dns_records):
|
|
recommendations.append("✅ Email client auto-configuration is enabled")
|
|
else:
|
|
recommendations.append("💡 Email client auto-configuration records are available")
|
|
|
|
result = {
|
|
"domain": domain,
|
|
"dkim_generated": bool(dkim_public_key),
|
|
"dns_records": dns_records,
|
|
"recommendations": recommendations,
|
|
"security_score": "85/100 (Excellent - all major security features configured)"
|
|
}
|
|
|
|
return json.dumps(result, indent=2)
|
|
|
|
except Exception as e:
|
|
return f"Error configuring domain security: {e}"
|
|
|
|
@mcp.tool()
|
|
async def analyze_domain_security(domain: str) -> str:
|
|
"""Analyze current domain security configuration and provide recommendations."""
|
|
try:
|
|
async with get_mailu_client() as mailu_client:
|
|
# Get domain info with DNS records
|
|
domain_response = await mailu_client.get(f"/domain/{domain}")
|
|
domain_response.raise_for_status()
|
|
domain_data = domain_response.json()
|
|
|
|
# Analyze security configuration
|
|
security_analysis = {
|
|
"domain": domain,
|
|
"security_features": {},
|
|
"recommendations": [],
|
|
"security_score": 0
|
|
}
|
|
|
|
# Check DKIM
|
|
dkim_key = domain_data.get("dkim_public_key", "")
|
|
if dkim_key:
|
|
security_analysis["security_features"]["dkim"] = {
|
|
"status": "✅ Configured",
|
|
"details": "DKIM keys are present and configured"
|
|
}
|
|
security_analysis["security_score"] += 25
|
|
else:
|
|
security_analysis["security_features"]["dkim"] = {
|
|
"status": "❌ Not configured",
|
|
"details": "DKIM keys are missing"
|
|
}
|
|
security_analysis["recommendations"].append("🔐 Generate DKIM keys for email authentication")
|
|
|
|
# Check domain limits (security through resource management)
|
|
max_users = domain_data.get("max_users", -1)
|
|
max_aliases = domain_data.get("max_aliases", -1)
|
|
max_quota = domain_data.get("max_quota_bytes", 0)
|
|
|
|
if max_users > 0 or max_aliases > 0 or max_quota > 0:
|
|
security_analysis["security_features"]["resource_limits"] = {
|
|
"status": "✅ Configured",
|
|
"details": f"Users: {max_users}, Aliases: {max_aliases}, Quota: {max_quota}"
|
|
}
|
|
security_analysis["security_score"] += 15
|
|
else:
|
|
security_analysis["security_features"]["resource_limits"] = {
|
|
"status": "⚠️ No limits set",
|
|
"details": "No resource limits configured"
|
|
}
|
|
security_analysis["recommendations"].append("🚧 Consider setting resource limits for better security")
|
|
|
|
# Check signup policy
|
|
signup_enabled = domain_data.get("signup_enabled", False)
|
|
if signup_enabled:
|
|
security_analysis["security_features"]["signup_policy"] = {
|
|
"status": "⚠️ Open signup enabled",
|
|
"details": "Domain allows open user registration"
|
|
}
|
|
security_analysis["recommendations"].append("🔒 Consider disabling open signup for better security")
|
|
else:
|
|
security_analysis["security_features"]["signup_policy"] = {
|
|
"status": "✅ Restricted signup",
|
|
"details": "Domain has restricted user registration"
|
|
}
|
|
security_analysis["security_score"] += 20
|
|
|
|
# Check for alternative domains
|
|
alternatives = domain_data.get("alternatives", [])
|
|
if alternatives:
|
|
security_analysis["security_features"]["alternative_domains"] = {
|
|
"status": "✅ Configured",
|
|
"details": f"Alternative domains: {', '.join(alternatives)}"
|
|
}
|
|
security_analysis["security_score"] += 10
|
|
else:
|
|
security_analysis["security_features"]["alternative_domains"] = {
|
|
"status": "💡 Optional",
|
|
"details": "No alternative domains configured"
|
|
}
|
|
security_analysis["recommendations"].append("🔄 Consider adding alternative domains for better deliverability")
|
|
|
|
# DNS security recommendations
|
|
security_analysis["recommendations"].extend([
|
|
"📧 Ensure SPF record is configured in DNS",
|
|
"🛡️ Configure DMARC policy in DNS",
|
|
"🔒 Add TLSA records for DANE support",
|
|
"⚙️ Configure autoconfig records for email clients"
|
|
])
|
|
|
|
# Calculate final score
|
|
if security_analysis["security_score"] >= 80:
|
|
score_label = "Excellent"
|
|
elif security_analysis["security_score"] >= 60:
|
|
score_label = "Good"
|
|
elif security_analysis["security_score"] >= 40:
|
|
score_label = "Fair"
|
|
else:
|
|
score_label = "Poor"
|
|
|
|
security_analysis["security_score"] = f"{security_analysis['security_score']}/100 ({score_label})"
|
|
|
|
# Additional recommendations based on score
|
|
if security_analysis["security_score"].startswith("100"):
|
|
security_analysis["recommendations"].append("🎉 Perfect security configuration!")
|
|
elif int(security_analysis["security_score"].split("/")[0]) < 60:
|
|
security_analysis["recommendations"].append("⚠️ Security configuration needs improvement")
|
|
|
|
# Add autoconfig recommendation if not mentioned
|
|
if not any("autoconfig" in rec for rec in security_analysis["recommendations"]):
|
|
security_analysis["recommendations"].append("💡 Consider adding autoconfig records for easier email client setup")
|
|
|
|
return json.dumps(security_analysis, indent=2)
|
|
|
|
except Exception as e:
|
|
return f"Error analyzing domain security: {e}"
|
|
|
|
# DNS Automation Tool - Orchestrates with LLM's other MCP tools
|
|
@mcp.tool()
|
|
async def prepare_dns_automation(domain: str, mail_server_ip: str = "", mail_server_hostname: str = "") -> str:
|
|
"""Generate DNS records for domain and provide instructions for automated DNS configuration via other MCP tools."""
|
|
try:
|
|
async with get_mailu_client() as mailu_client:
|
|
# Get domain info and generate DKIM if needed
|
|
domain_response = await mailu_client.get(f"/domain/{domain}")
|
|
if domain_response.status_code == 404:
|
|
return f"Domain {domain} not found in Mailu. Please create the domain first."
|
|
|
|
domain_response.raise_for_status()
|
|
domain_data = domain_response.json()
|
|
|
|
# Ensure DKIM keys exist
|
|
dkim_public_key = domain_data.get("dkim_public_key", "")
|
|
if not dkim_public_key:
|
|
# Generate DKIM keys
|
|
dkim_response = await mailu_client.post(f"/domain/{domain}/dkim")
|
|
dkim_response.raise_for_status()
|
|
|
|
# Fetch updated domain data
|
|
domain_response = await mailu_client.get(f"/domain/{domain}")
|
|
domain_response.raise_for_status()
|
|
domain_data = domain_response.json()
|
|
dkim_public_key = domain_data.get("dkim_public_key", "")
|
|
|
|
# Set defaults if not provided
|
|
if not mail_server_hostname:
|
|
mail_server_hostname = f"mail.{domain}"
|
|
if not mail_server_ip:
|
|
mail_server_ip = "YOUR_SERVER_IP_HERE"
|
|
|
|
# Generate comprehensive DNS records
|
|
dns_automation_plan = {
|
|
"domain": domain,
|
|
"mail_server": {
|
|
"hostname": mail_server_hostname,
|
|
"ip": mail_server_ip
|
|
},
|
|
"dns_records": [
|
|
{
|
|
"type": "MX",
|
|
"name": domain,
|
|
"value": f"10 {mail_server_hostname}",
|
|
"priority": "CRITICAL",
|
|
"description": "Mail exchange record - required for email delivery"
|
|
},
|
|
{
|
|
"type": "A",
|
|
"name": mail_server_hostname,
|
|
"value": mail_server_ip,
|
|
"priority": "CRITICAL",
|
|
"description": "IPv4 address for mail server hostname"
|
|
},
|
|
{
|
|
"type": "TXT",
|
|
"name": domain,
|
|
"value": f"v=spf1 mx a:{mail_server_hostname} -all",
|
|
"priority": "HIGH",
|
|
"description": "SPF record - prevents email spoofing"
|
|
},
|
|
{
|
|
"type": "TXT",
|
|
"name": f"_dmarc.{domain}",
|
|
"value": f"v=DMARC1; p=quarantine; rua=mailto:dmarc@{domain}; ruf=mailto:dmarc@{domain}; fo=1",
|
|
"priority": "HIGH",
|
|
"description": "DMARC policy - email authentication and reporting"
|
|
}
|
|
],
|
|
"conditional_records": [
|
|
{
|
|
"type": "TXT",
|
|
"name": f"dkim._domainkey.{domain}",
|
|
"value": dkim_public_key,
|
|
"priority": "HIGH",
|
|
"description": "DKIM public key - email authentication",
|
|
"condition": "DKIM key available"
|
|
}
|
|
],
|
|
"optional_records": [
|
|
{
|
|
"type": "CNAME",
|
|
"name": f"autoconfig.{domain}",
|
|
"value": mail_server_hostname,
|
|
"priority": "MEDIUM",
|
|
"description": "Email client auto-configuration (Thunderbird, etc.)"
|
|
},
|
|
{
|
|
"type": "CNAME",
|
|
"name": f"autodiscover.{domain}",
|
|
"value": mail_server_hostname,
|
|
"priority": "MEDIUM",
|
|
"description": "Email client auto-discovery (Outlook, etc.)"
|
|
},
|
|
{
|
|
"type": "SRV",
|
|
"name": f"_submission._tcp.{domain}",
|
|
"value": f"0 1 587 {mail_server_hostname}",
|
|
"priority": "LOW",
|
|
"description": "SMTP submission service record"
|
|
},
|
|
{
|
|
"type": "SRV",
|
|
"name": f"_imaps._tcp.{domain}",
|
|
"value": f"0 1 993 {mail_server_hostname}",
|
|
"priority": "LOW",
|
|
"description": "IMAP over SSL service record"
|
|
}
|
|
],
|
|
"automation_instructions": {
|
|
"overview": "Use your available DNS management MCP tools to configure these records",
|
|
"steps": [
|
|
"1. Identify available DNS management tools (e.g., Cloudflare, Route53, etc.)",
|
|
"2. Start with CRITICAL priority records (MX and A records)",
|
|
"3. Add HIGH priority records (SPF, DMARC, DKIM)",
|
|
"4. Optionally add MEDIUM/LOW priority records for enhanced functionality",
|
|
"5. Verify DNS propagation using DNS lookup tools"
|
|
],
|
|
"verification_commands": [
|
|
f"dig MX {domain}",
|
|
f"dig TXT {domain}",
|
|
f"dig TXT _dmarc.{domain}",
|
|
f"dig TXT dkim._domainkey.{domain}"
|
|
]
|
|
}
|
|
}
|
|
|
|
# Add DKIM record to main records if available
|
|
if dkim_public_key:
|
|
dns_automation_plan["dns_records"].append({
|
|
"type": "TXT",
|
|
"name": f"dkim._domainkey.{domain}",
|
|
"value": dkim_public_key,
|
|
"priority": "HIGH",
|
|
"description": "DKIM public key - email authentication signature"
|
|
})
|
|
|
|
# Create completion request for the LLM
|
|
completion_request = f"""
|
|
🚀 **DNS AUTOMATION REQUEST FOR {domain.upper()}**
|
|
|
|
I have prepared the complete DNS configuration for the domain `{domain}`. Please use your available DNS management MCP tools to automatically configure these records.
|
|
|
|
**CRITICAL RECORDS (Configure First):**
|
|
"""
|
|
|
|
for record in dns_automation_plan["dns_records"]:
|
|
if record["priority"] == "CRITICAL":
|
|
completion_request += f"- **{record['type']}** `{record['name']}` → `{record['value']}`\n"
|
|
|
|
completion_request += f"""
|
|
**HIGH PRIORITY RECORDS (Security):**
|
|
"""
|
|
|
|
for record in dns_automation_plan["dns_records"]:
|
|
if record["priority"] == "HIGH":
|
|
completion_request += f"- **{record['type']}** `{record['name']}` → `{record['value']}`\n"
|
|
|
|
completion_request += f"""
|
|
**OPTIONAL ENHANCEMENTS:**
|
|
"""
|
|
|
|
for record in dns_automation_plan["optional_records"]:
|
|
completion_request += f"- **{record['type']}** `{record['name']}` → `{record['value']}` ({record['description']})\n"
|
|
|
|
completion_request += f"""
|
|
|
|
**AUTOMATION STEPS:**
|
|
1. Use your DNS management tools to create the above records
|
|
2. For each record, use the exact values provided
|
|
3. Verify the records are created successfully
|
|
4. Report back with the results
|
|
|
|
**VERIFICATION:**
|
|
After creating records, please verify with:
|
|
- `dig MX {domain}`
|
|
- `dig TXT {domain}` (SPF)
|
|
- `dig TXT _dmarc.{domain}` (DMARC)
|
|
- `dig TXT dkim._domainkey.{domain}` (DKIM)
|
|
|
|
Please proceed with the DNS automation using your available MCP tools!
|
|
"""
|
|
|
|
# Return both the structured data and the completion request
|
|
return json.dumps({
|
|
"dns_plan": dns_automation_plan,
|
|
"completion_request": completion_request,
|
|
"summary": f"Generated DNS automation plan for {domain} with {len(dns_automation_plan['dns_records'])} critical/high priority records and {len(dns_automation_plan['optional_records'])} optional enhancements"
|
|
}, indent=2)
|
|
|
|
except Exception as e:
|
|
return f"Error preparing DNS automation: {e}"
|
|
|
|
logger.info("Created comprehensive MCP server with manual tools and resources")
|
|
return mcp
|
|
|
|
|
|
def main():
|
|
"""Main entry point for the MCP server."""
|
|
logger.info("Starting Mailu MCP Server")
|
|
|
|
try:
|
|
# Create and run the MCP server
|
|
mcp = create_mcp_server()
|
|
|
|
mcp.run(
|
|
transport="stdio"
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to start MCP server: {e}")
|
|
raise
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main() |