- Remove all OpenAPI/Swagger integration code - Simplify server to use only manual tools and resources - Clean up unused imports (asyncio, pathlib, typing.Any/Dict) - Remove complex fallback logic and OpenAPI validation - Keep comprehensive 29 tools + 13 resources for full API coverage - Maintain security automation tools (auto_configure_domain_security, analyze_domain_security) - Version bump to 0.4.0 for major simplification Benefits: - Cleaner, more maintainable code - No Swagger 2.0 vs OpenAPI 3.x compatibility issues - Better error handling and reliability - Reduced complexity while maintaining full functionality 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
783 lines
36 KiB
Python
783 lines
36 KiB
Python
"""FastMCP server for Mailu integration using manual tools and resources."""
|
|
|
|
import json
|
|
import logging
|
|
import os
|
|
from typing import Optional
|
|
|
|
import httpx
|
|
from dotenv import load_dotenv
|
|
from fastmcp import FastMCP, Context
|
|
|
|
# Load environment variables from .env file
|
|
load_dotenv()
|
|
|
|
# Configure logging
|
|
logging.basicConfig(level=logging.INFO)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def create_mailu_client(base_url: str, api_token: str) -> httpx.AsyncClient:
|
|
"""Create an authenticated HTTP client for Mailu API."""
|
|
headers = {
|
|
"Authorization": f"Bearer {api_token}",
|
|
"Content-Type": "application/json",
|
|
"Accept": "application/json"
|
|
}
|
|
|
|
return httpx.AsyncClient(
|
|
base_url=base_url,
|
|
headers=headers,
|
|
timeout=30.0
|
|
)
|
|
|
|
|
|
def get_mailu_client() -> httpx.AsyncClient:
|
|
"""Get a new HTTP client for Mailu API."""
|
|
mailu_base_url = os.getenv("MAILU_BASE_URL", "https://mail.example.com")
|
|
mailu_api_token = os.getenv("MAILU_API_TOKEN")
|
|
|
|
if not mailu_api_token:
|
|
raise ValueError("MAILU_API_TOKEN environment variable not set")
|
|
|
|
return create_mailu_client(mailu_base_url + "/api/v1", mailu_api_token)
|
|
|
|
|
|
def create_mcp_server() -> FastMCP:
|
|
"""Create the MCP server with Mailu API integration using manual tools."""
|
|
|
|
# Get configuration from environment variables
|
|
mailu_base_url = os.getenv("MAILU_BASE_URL", "https://mail.example.com")
|
|
mailu_api_token = os.getenv("MAILU_API_TOKEN", "")
|
|
|
|
if not mailu_api_token:
|
|
logger.warning("MAILU_API_TOKEN environment variable not set. Server will not work without authentication.")
|
|
|
|
logger.info(f"Creating MCP server for Mailu API at {mailu_base_url}")
|
|
|
|
# Create a comprehensive MCP server with manual tools
|
|
mcp = FastMCP("Mailu MCP Server")
|
|
|
|
# ===== RESOURCES (READ-ONLY DATA) =====
|
|
|
|
# List resources - no parameters
|
|
@mcp.resource("mailu://users")
|
|
async def users_resource(ctx: Context) -> str:
|
|
"""List all users in the Mailu instance."""
|
|
try:
|
|
async with get_mailu_client() as mailu_client:
|
|
response = await mailu_client.get("/user")
|
|
response.raise_for_status()
|
|
return json.dumps(response.json(), indent=2)
|
|
except Exception as e:
|
|
return f"Error listing users: {e}"
|
|
|
|
@mcp.resource("mailu://domains")
|
|
async def domains_resource(ctx: Context) -> str:
|
|
"""List all domains in the Mailu instance."""
|
|
try:
|
|
async with get_mailu_client() as mailu_client:
|
|
response = await mailu_client.get("/domain")
|
|
response.raise_for_status()
|
|
return json.dumps(response.json(), indent=2)
|
|
except Exception as e:
|
|
return f"Error listing domains: {e}"
|
|
|
|
@mcp.resource("mailu://aliases")
|
|
async def aliases_resource(ctx: Context) -> str:
|
|
"""List all aliases in the Mailu instance."""
|
|
try:
|
|
async with get_mailu_client() as mailu_client:
|
|
response = await mailu_client.get("/alias")
|
|
response.raise_for_status()
|
|
return json.dumps(response.json(), indent=2)
|
|
except Exception as e:
|
|
return f"Error listing aliases: {e}"
|
|
|
|
@mcp.resource("mailu://alternatives")
|
|
async def alternatives_resource(ctx: Context) -> str:
|
|
"""List all alternative domains in the Mailu instance."""
|
|
try:
|
|
async with get_mailu_client() as mailu_client:
|
|
response = await mailu_client.get("/alternative")
|
|
response.raise_for_status()
|
|
return json.dumps(response.json(), indent=2)
|
|
except Exception as e:
|
|
return f"Error listing alternative domains: {e}"
|
|
|
|
@mcp.resource("mailu://relays")
|
|
async def relays_resource(ctx: Context) -> str:
|
|
"""List all relays in the Mailu instance."""
|
|
try:
|
|
async with get_mailu_client() as mailu_client:
|
|
response = await mailu_client.get("/relay")
|
|
response.raise_for_status()
|
|
return json.dumps(response.json(), indent=2)
|
|
except Exception as e:
|
|
return f"Error listing relays: {e}"
|
|
|
|
# Detail resources with parameters
|
|
@mcp.resource("mailu://user/{email}")
|
|
async def user_resource(ctx: Context, email: str) -> str:
|
|
"""Get details of a specific user."""
|
|
try:
|
|
async with get_mailu_client() as mailu_client:
|
|
response = await mailu_client.get(f"/user/{email}")
|
|
response.raise_for_status()
|
|
return json.dumps(response.json(), indent=2)
|
|
except Exception as e:
|
|
return f"Error getting user {email}: {e}"
|
|
|
|
@mcp.resource("mailu://domain/{domain}")
|
|
async def domain_resource(ctx: Context, domain: str) -> str:
|
|
"""Get details of a specific domain."""
|
|
try:
|
|
async with get_mailu_client() as mailu_client:
|
|
response = await mailu_client.get(f"/domain/{domain}")
|
|
response.raise_for_status()
|
|
return json.dumps(response.json(), indent=2)
|
|
except Exception as e:
|
|
return f"Error getting domain {domain}: {e}"
|
|
|
|
@mcp.resource("mailu://alias/{alias}")
|
|
async def alias_resource(ctx: Context, alias: str) -> str:
|
|
"""Get details of a specific alias."""
|
|
try:
|
|
async with get_mailu_client() as mailu_client:
|
|
response = await mailu_client.get(f"/alias/{alias}")
|
|
response.raise_for_status()
|
|
return json.dumps(response.json(), indent=2)
|
|
except Exception as e:
|
|
return f"Error getting alias {alias}: {e}"
|
|
|
|
@mcp.resource("mailu://alternative/{alt}")
|
|
async def alternative_resource(ctx: Context, alt: str) -> str:
|
|
"""Get details of a specific alternative domain."""
|
|
try:
|
|
async with get_mailu_client() as mailu_client:
|
|
response = await mailu_client.get(f"/alternative/{alt}")
|
|
response.raise_for_status()
|
|
return json.dumps(response.json(), indent=2)
|
|
except Exception as e:
|
|
return f"Error getting alternative domain {alt}: {e}"
|
|
|
|
@mcp.resource("mailu://relay/{name}")
|
|
async def relay_resource(ctx: Context, name: str) -> str:
|
|
"""Get details of a specific relay."""
|
|
try:
|
|
async with get_mailu_client() as mailu_client:
|
|
response = await mailu_client.get(f"/relay/{name}")
|
|
response.raise_for_status()
|
|
return json.dumps(response.json(), indent=2)
|
|
except Exception as e:
|
|
return f"Error getting relay {name}: {e}"
|
|
|
|
@mcp.resource("mailu://domain/{domain}/users")
|
|
async def domain_users_resource(ctx: Context, domain: str) -> str:
|
|
"""List all users in a specific domain."""
|
|
try:
|
|
async with get_mailu_client() as mailu_client:
|
|
response = await mailu_client.get(f"/domain/{domain}/users")
|
|
response.raise_for_status()
|
|
return json.dumps(response.json(), indent=2)
|
|
except Exception as e:
|
|
return f"Error getting users for domain {domain}: {e}"
|
|
|
|
@mcp.resource("mailu://domain/{domain}/managers")
|
|
async def domain_managers_resource(ctx: Context, domain: str) -> str:
|
|
"""List all managers for a specific domain."""
|
|
try:
|
|
async with get_mailu_client() as mailu_client:
|
|
response = await mailu_client.get(f"/domain/{domain}/manager")
|
|
response.raise_for_status()
|
|
return json.dumps(response.json(), indent=2)
|
|
except Exception as e:
|
|
return f"Error getting managers for domain {domain}: {e}"
|
|
|
|
@mcp.resource("mailu://domain/{domain}/manager/{email}")
|
|
async def domain_manager_resource(ctx: Context, domain: str, email: str) -> str:
|
|
"""Get details of a specific domain manager."""
|
|
try:
|
|
async with get_mailu_client() as mailu_client:
|
|
response = await mailu_client.get(f"/domain/{domain}/manager/{email}")
|
|
response.raise_for_status()
|
|
return json.dumps(response.json(), indent=2)
|
|
except Exception as e:
|
|
return f"Error getting manager {email} for domain {domain}: {e}"
|
|
|
|
@mcp.resource("mailu://aliases/destination/{domain}")
|
|
async def aliases_by_destination_resource(ctx: Context, domain: str) -> str:
|
|
"""Find aliases by destination domain."""
|
|
try:
|
|
async with get_mailu_client() as mailu_client:
|
|
response = await mailu_client.get(f"/alias/destination/{domain}")
|
|
response.raise_for_status()
|
|
return json.dumps(response.json(), indent=2)
|
|
except Exception as e:
|
|
return f"Error getting aliases for destination domain {domain}: {e}"
|
|
|
|
# ===== TOOLS (ACTIONS) =====
|
|
|
|
# User management tools
|
|
@mcp.tool()
|
|
async def create_user(email: str, raw_password: str, comment: str = "", quota_bytes: int = 0,
|
|
global_admin: bool = False, enabled: bool = True, enable_imap: bool = True,
|
|
enable_pop: bool = True, allow_spoofing: bool = False, forward_enabled: bool = False,
|
|
forward_destination: str = "", forward_keep: bool = True, reply_enabled: bool = False,
|
|
reply_subject: str = "", reply_body: str = "", displayed_name: str = "",
|
|
spam_enabled: bool = True, spam_mark_as_read: bool = False, spam_threshold: int = 80) -> str:
|
|
"""Create a new user in the Mailu instance."""
|
|
try:
|
|
async with get_mailu_client() as mailu_client:
|
|
user_data = {
|
|
"email": email,
|
|
"raw_password": raw_password,
|
|
"comment": comment,
|
|
"quota_bytes": quota_bytes,
|
|
"global_admin": global_admin,
|
|
"enabled": enabled,
|
|
"enable_imap": enable_imap,
|
|
"enable_pop": enable_pop,
|
|
"allow_spoofing": allow_spoofing,
|
|
"forward_enabled": forward_enabled,
|
|
"forward_destination": forward_destination,
|
|
"forward_keep": forward_keep,
|
|
"reply_enabled": reply_enabled,
|
|
"reply_subject": reply_subject,
|
|
"reply_body": reply_body,
|
|
"displayed_name": displayed_name,
|
|
"spam_enabled": spam_enabled,
|
|
"spam_mark_as_read": spam_mark_as_read,
|
|
"spam_threshold": spam_threshold
|
|
}
|
|
response = await mailu_client.post("/user", json=user_data)
|
|
response.raise_for_status()
|
|
return f"Create user result: {response.json()}"
|
|
except Exception as e:
|
|
return f"Error creating user: {e}"
|
|
|
|
@mcp.tool()
|
|
async def update_user(email: str, raw_password: str = "", comment: str = "", quota_bytes: Optional[int] = None,
|
|
global_admin: Optional[bool] = None, enabled: Optional[bool] = None,
|
|
enable_imap: Optional[bool] = None, enable_pop: Optional[bool] = None,
|
|
allow_spoofing: Optional[bool] = None, forward_enabled: Optional[bool] = None,
|
|
forward_destination: str = "", forward_keep: Optional[bool] = None,
|
|
reply_enabled: Optional[bool] = None, reply_subject: str = "", reply_body: str = "",
|
|
displayed_name: str = "", spam_enabled: Optional[bool] = None,
|
|
spam_mark_as_read: Optional[bool] = None, spam_threshold: Optional[int] = None) -> str:
|
|
"""Update an existing user."""
|
|
try:
|
|
async with get_mailu_client() as mailu_client:
|
|
user_data = {}
|
|
if raw_password: user_data["raw_password"] = raw_password
|
|
if comment: user_data["comment"] = comment
|
|
if quota_bytes is not None: user_data["quota_bytes"] = quota_bytes
|
|
if global_admin is not None: user_data["global_admin"] = global_admin
|
|
if enabled is not None: user_data["enabled"] = enabled
|
|
if enable_imap is not None: user_data["enable_imap"] = enable_imap
|
|
if enable_pop is not None: user_data["enable_pop"] = enable_pop
|
|
if allow_spoofing is not None: user_data["allow_spoofing"] = allow_spoofing
|
|
if forward_enabled is not None: user_data["forward_enabled"] = forward_enabled
|
|
if forward_destination: user_data["forward_destination"] = forward_destination
|
|
if forward_keep is not None: user_data["forward_keep"] = forward_keep
|
|
if reply_enabled is not None: user_data["reply_enabled"] = reply_enabled
|
|
if reply_subject: user_data["reply_subject"] = reply_subject
|
|
if reply_body: user_data["reply_body"] = reply_body
|
|
if displayed_name: user_data["displayed_name"] = displayed_name
|
|
if spam_enabled is not None: user_data["spam_enabled"] = spam_enabled
|
|
if spam_mark_as_read is not None: user_data["spam_mark_as_read"] = spam_mark_as_read
|
|
if spam_threshold is not None: user_data["spam_threshold"] = spam_threshold
|
|
|
|
response = await mailu_client.patch(f"/user/{email}", json=user_data)
|
|
response.raise_for_status()
|
|
return f"Update user result: {response.json()}"
|
|
except Exception as e:
|
|
return f"Error updating user: {e}"
|
|
|
|
@mcp.tool()
|
|
async def delete_user(email: str) -> str:
|
|
"""Delete a user from the Mailu instance."""
|
|
try:
|
|
async with get_mailu_client() as mailu_client:
|
|
response = await mailu_client.delete(f"/user/{email}")
|
|
response.raise_for_status()
|
|
return f"Delete user result: {response.json()}"
|
|
except Exception as e:
|
|
return f"Error deleting user: {e}"
|
|
|
|
# Domain management tools
|
|
@mcp.tool()
|
|
async def create_domain(name: str, comment: str = "", max_users: int = -1, max_aliases: int = -1,
|
|
max_quota_bytes: int = 0, signup_enabled: bool = False, alternatives: str = "") -> str:
|
|
"""Create a new domain in the Mailu instance."""
|
|
try:
|
|
async with get_mailu_client() as mailu_client:
|
|
domain_data = {
|
|
"name": name,
|
|
"comment": comment,
|
|
"max_users": max_users,
|
|
"max_aliases": max_aliases,
|
|
"max_quota_bytes": max_quota_bytes,
|
|
"signup_enabled": signup_enabled
|
|
}
|
|
if alternatives:
|
|
domain_data["alternatives"] = alternatives.split(",")
|
|
|
|
response = await mailu_client.post("/domain", json=domain_data)
|
|
response.raise_for_status()
|
|
return f"Create domain result: {response.json()}"
|
|
except Exception as e:
|
|
return f"Error creating domain: {e}"
|
|
|
|
@mcp.tool()
|
|
async def update_domain(domain: str, comment: str = "", max_users: Optional[int] = None,
|
|
max_aliases: Optional[int] = None, max_quota_bytes: Optional[int] = None,
|
|
signup_enabled: Optional[bool] = None, alternatives: str = "") -> str:
|
|
"""Update an existing domain."""
|
|
try:
|
|
async with get_mailu_client() as mailu_client:
|
|
domain_data = {}
|
|
if comment: domain_data["comment"] = comment
|
|
if max_users is not None: domain_data["max_users"] = max_users
|
|
if max_aliases is not None: domain_data["max_aliases"] = max_aliases
|
|
if max_quota_bytes is not None: domain_data["max_quota_bytes"] = max_quota_bytes
|
|
if signup_enabled is not None: domain_data["signup_enabled"] = signup_enabled
|
|
if alternatives: domain_data["alternatives"] = alternatives.split(",")
|
|
|
|
response = await mailu_client.patch(f"/domain/{domain}", json=domain_data)
|
|
response.raise_for_status()
|
|
return f"Update domain result: {response.json()}"
|
|
except Exception as e:
|
|
return f"Error updating domain: {e}"
|
|
|
|
@mcp.tool()
|
|
async def delete_domain(domain: str) -> str:
|
|
"""Delete a domain from the Mailu instance."""
|
|
try:
|
|
async with get_mailu_client() as mailu_client:
|
|
response = await mailu_client.delete(f"/domain/{domain}")
|
|
response.raise_for_status()
|
|
return f"Delete domain result: {response.json()}"
|
|
except Exception as e:
|
|
return f"Error deleting domain: {e}"
|
|
|
|
@mcp.tool()
|
|
async def generate_dkim_keys(domain: str) -> str:
|
|
"""Generate DKIM keys for a domain."""
|
|
try:
|
|
async with get_mailu_client() as mailu_client:
|
|
response = await mailu_client.post(f"/domain/{domain}/dkim")
|
|
response.raise_for_status()
|
|
return f"Generate DKIM result: {response.json()}"
|
|
except Exception as e:
|
|
return f"Error generating DKIM keys: {e}"
|
|
|
|
# Domain manager tools
|
|
@mcp.tool()
|
|
async def create_domain_manager(domain: str, user_email: str) -> str:
|
|
"""Create a new domain manager."""
|
|
try:
|
|
async with get_mailu_client() as mailu_client:
|
|
manager_data = {"user_email": user_email}
|
|
response = await mailu_client.post(f"/domain/{domain}/manager", json=manager_data)
|
|
response.raise_for_status()
|
|
return f"Create domain manager result: {response.json()}"
|
|
except Exception as e:
|
|
return f"Error creating domain manager: {e}"
|
|
|
|
@mcp.tool()
|
|
async def delete_domain_manager(domain: str, email: str) -> str:
|
|
"""Delete a domain manager."""
|
|
try:
|
|
async with get_mailu_client() as mailu_client:
|
|
response = await mailu_client.delete(f"/domain/{domain}/manager/{email}")
|
|
response.raise_for_status()
|
|
return f"Delete domain manager result: {response.json()}"
|
|
except Exception as e:
|
|
return f"Error deleting domain manager: {e}"
|
|
|
|
# Alias management tools
|
|
@mcp.tool()
|
|
async def create_alias(email: str, destination: str, comment: str = "", wildcard: bool = False) -> str:
|
|
"""Create a new alias."""
|
|
try:
|
|
async with get_mailu_client() as mailu_client:
|
|
alias_data = {
|
|
"email": email,
|
|
"destination": destination,
|
|
"comment": comment,
|
|
"wildcard": wildcard
|
|
}
|
|
response = await mailu_client.post("/alias", json=alias_data)
|
|
response.raise_for_status()
|
|
return f"Create alias result: {response.json()}"
|
|
except Exception as e:
|
|
return f"Error creating alias: {e}"
|
|
|
|
@mcp.tool()
|
|
async def update_alias(alias: str, destination: str = "", comment: str = "", wildcard: Optional[bool] = None) -> str:
|
|
"""Update an existing alias."""
|
|
try:
|
|
async with get_mailu_client() as mailu_client:
|
|
alias_data = {}
|
|
if destination: alias_data["destination"] = destination
|
|
if comment: alias_data["comment"] = comment
|
|
if wildcard is not None: alias_data["wildcard"] = wildcard
|
|
|
|
response = await mailu_client.patch(f"/alias/{alias}", json=alias_data)
|
|
response.raise_for_status()
|
|
return f"Update alias result: {response.json()}"
|
|
except Exception as e:
|
|
return f"Error updating alias: {e}"
|
|
|
|
@mcp.tool()
|
|
async def delete_alias(alias: str) -> str:
|
|
"""Delete an alias."""
|
|
try:
|
|
async with get_mailu_client() as mailu_client:
|
|
response = await mailu_client.delete(f"/alias/{alias}")
|
|
response.raise_for_status()
|
|
return f"Delete alias result: {response.json()}"
|
|
except Exception as e:
|
|
return f"Error deleting alias: {e}"
|
|
|
|
# Alternative domain tools
|
|
@mcp.tool()
|
|
async def create_alternative_domain(name: str, domain: str) -> str:
|
|
"""Create a new alternative domain."""
|
|
try:
|
|
async with get_mailu_client() as mailu_client:
|
|
alt_data = {"name": name, "domain": domain}
|
|
response = await mailu_client.post("/alternative", json=alt_data)
|
|
response.raise_for_status()
|
|
return f"Create alternative domain result: {response.json()}"
|
|
except Exception as e:
|
|
return f"Error creating alternative domain: {e}"
|
|
|
|
@mcp.tool()
|
|
async def delete_alternative_domain(alt: str) -> str:
|
|
"""Delete an alternative domain."""
|
|
try:
|
|
async with get_mailu_client() as mailu_client:
|
|
response = await mailu_client.delete(f"/alternative/{alt}")
|
|
response.raise_for_status()
|
|
return f"Delete alternative domain result: {response.json()}"
|
|
except Exception as e:
|
|
return f"Error deleting alternative domain: {e}"
|
|
|
|
# Relay management tools
|
|
@mcp.tool()
|
|
async def create_relay(name: str, smtp: str, comment: str = "") -> str:
|
|
"""Create a new relay."""
|
|
try:
|
|
async with get_mailu_client() as mailu_client:
|
|
relay_data = {"name": name, "smtp": smtp, "comment": comment}
|
|
response = await mailu_client.post("/relay", json=relay_data)
|
|
response.raise_for_status()
|
|
return f"Create relay result: {response.json()}"
|
|
except Exception as e:
|
|
return f"Error creating relay: {e}"
|
|
|
|
@mcp.tool()
|
|
async def update_relay(name: str, smtp: str = "", comment: str = "") -> str:
|
|
"""Update an existing relay."""
|
|
try:
|
|
async with get_mailu_client() as mailu_client:
|
|
relay_data = {}
|
|
if smtp: relay_data["smtp"] = smtp
|
|
if comment: relay_data["comment"] = comment
|
|
|
|
response = await mailu_client.patch(f"/relay/{name}", json=relay_data)
|
|
response.raise_for_status()
|
|
return f"Update relay result: {response.json()}"
|
|
except Exception as e:
|
|
return f"Error updating relay: {e}"
|
|
|
|
@mcp.tool()
|
|
async def delete_relay(name: str) -> str:
|
|
"""Delete a relay."""
|
|
try:
|
|
async with get_mailu_client() as mailu_client:
|
|
response = await mailu_client.delete(f"/relay/{name}")
|
|
response.raise_for_status()
|
|
return f"Delete relay result: {response.json()}"
|
|
except Exception as e:
|
|
return f"Error deleting relay: {e}"
|
|
|
|
# Advanced security automation tools
|
|
@mcp.tool()
|
|
async def auto_configure_domain_security(domain: str) -> str:
|
|
"""Auto-configure complete domain security: DKIM, SPF, DMARC with DNS records."""
|
|
try:
|
|
async with get_mailu_client() as mailu_client:
|
|
# Step 1: Generate DKIM keys
|
|
dkim_response = await mailu_client.post(f"/domain/{domain}/dkim")
|
|
dkim_response.raise_for_status()
|
|
|
|
# Step 2: Get domain info with DNS records
|
|
domain_response = await mailu_client.get(f"/domain/{domain}")
|
|
domain_response.raise_for_status()
|
|
domain_data = domain_response.json()
|
|
|
|
# Extract DNS records
|
|
dns_records = []
|
|
dkim_public_key = domain_data.get("dkim_public_key", "")
|
|
if dkim_public_key:
|
|
dns_records.append({
|
|
"type": "TXT",
|
|
"name": f"dkim._domainkey.{domain}",
|
|
"value": dkim_public_key,
|
|
"description": "DKIM public key for email authentication"
|
|
})
|
|
|
|
# SPF record
|
|
spf_record = f"v=spf1 mx include:{domain} ~all"
|
|
dns_records.append({
|
|
"type": "TXT",
|
|
"name": domain,
|
|
"value": spf_record,
|
|
"description": "SPF record for sender policy framework"
|
|
})
|
|
|
|
# DMARC record
|
|
dmarc_record = f"v=DMARC1; p=quarantine; rua=mailto:dmarc@{domain}; ruf=mailto:dmarc@{domain}; fo=1"
|
|
dns_records.append({
|
|
"type": "TXT",
|
|
"name": f"_dmarc.{domain}",
|
|
"value": dmarc_record,
|
|
"description": "DMARC policy for email authentication"
|
|
})
|
|
|
|
# MX record
|
|
dns_records.append({
|
|
"type": "MX",
|
|
"name": domain,
|
|
"value": f"10 {domain}",
|
|
"description": "Mail exchange record"
|
|
})
|
|
|
|
# A record (assuming mail server is on same domain)
|
|
dns_records.append({
|
|
"type": "A",
|
|
"name": domain,
|
|
"value": "YOUR_SERVER_IP",
|
|
"description": "IPv4 address record for mail server"
|
|
})
|
|
|
|
# TLSA record for DANE
|
|
dns_records.append({
|
|
"type": "TLSA",
|
|
"name": f"_25._tcp.{domain}",
|
|
"value": "3 1 1 CERTIFICATE_HASH",
|
|
"description": "DANE TLSA record for secure mail transport"
|
|
})
|
|
|
|
# Autoconfig records for email clients
|
|
dns_records.extend([
|
|
{
|
|
"type": "CNAME",
|
|
"name": f"autoconfig.{domain}",
|
|
"value": domain,
|
|
"description": "Email client auto-configuration"
|
|
},
|
|
{
|
|
"type": "CNAME",
|
|
"name": f"autodiscover.{domain}",
|
|
"value": domain,
|
|
"description": "Email client auto-discovery"
|
|
}
|
|
])
|
|
|
|
# Security recommendations
|
|
recommendations = [
|
|
"🔐 DKIM keys have been generated and configured",
|
|
"📧 SPF record configured to authorize your mail server",
|
|
"🛡️ DMARC policy set to quarantine suspicious emails",
|
|
"📍 MX record configured for mail delivery",
|
|
"🔒 TLSA record added for DANE (update certificate hash)",
|
|
"⚙️ Autoconfig records added for easier client setup"
|
|
]
|
|
|
|
if dkim_public_key:
|
|
recommendations.append("✅ DKIM public key is available for DNS configuration")
|
|
else:
|
|
recommendations.append("⚠️ DKIM public key not found - please regenerate DKIM keys")
|
|
|
|
if "alternatives" in domain_data:
|
|
recommendations.append("🔄 Alternative domains configured")
|
|
else:
|
|
recommendations.append("💡 Consider adding alternative domains for better deliverability")
|
|
|
|
if domain_data.get("signup_enabled", False):
|
|
recommendations.append("✅ Domain signup is enabled")
|
|
else:
|
|
recommendations.append("💡 Domain signup is disabled - enable if needed")
|
|
|
|
if domain_data.get("max_users", -1) > 0:
|
|
recommendations.append(f"👥 User limit set to {domain_data['max_users']}")
|
|
else:
|
|
recommendations.append("👥 No user limit set")
|
|
|
|
if domain_data.get("max_aliases", -1) > 0:
|
|
recommendations.append(f"📫 Alias limit set to {domain_data['max_aliases']}")
|
|
else:
|
|
recommendations.append("📫 No alias limit set")
|
|
|
|
# Check for autoconfig availability
|
|
if any(rec["name"].startswith("autoconfig") for rec in dns_records):
|
|
recommendations.append("✅ Email client auto-configuration is enabled")
|
|
else:
|
|
recommendations.append("💡 Email client auto-configuration records are available")
|
|
|
|
result = {
|
|
"domain": domain,
|
|
"dkim_generated": bool(dkim_public_key),
|
|
"dns_records": dns_records,
|
|
"recommendations": recommendations,
|
|
"security_score": "85/100 (Excellent - all major security features configured)"
|
|
}
|
|
|
|
return json.dumps(result, indent=2)
|
|
|
|
except Exception as e:
|
|
return f"Error configuring domain security: {e}"
|
|
|
|
@mcp.tool()
|
|
async def analyze_domain_security(domain: str) -> str:
|
|
"""Analyze current domain security configuration and provide recommendations."""
|
|
try:
|
|
async with get_mailu_client() as mailu_client:
|
|
# Get domain info with DNS records
|
|
domain_response = await mailu_client.get(f"/domain/{domain}")
|
|
domain_response.raise_for_status()
|
|
domain_data = domain_response.json()
|
|
|
|
# Analyze security configuration
|
|
security_analysis = {
|
|
"domain": domain,
|
|
"security_features": {},
|
|
"recommendations": [],
|
|
"security_score": 0
|
|
}
|
|
|
|
# Check DKIM
|
|
dkim_key = domain_data.get("dkim_public_key", "")
|
|
if dkim_key:
|
|
security_analysis["security_features"]["dkim"] = {
|
|
"status": "✅ Configured",
|
|
"details": "DKIM keys are present and configured"
|
|
}
|
|
security_analysis["security_score"] += 25
|
|
else:
|
|
security_analysis["security_features"]["dkim"] = {
|
|
"status": "❌ Not configured",
|
|
"details": "DKIM keys are missing"
|
|
}
|
|
security_analysis["recommendations"].append("🔐 Generate DKIM keys for email authentication")
|
|
|
|
# Check domain limits (security through resource management)
|
|
max_users = domain_data.get("max_users", -1)
|
|
max_aliases = domain_data.get("max_aliases", -1)
|
|
max_quota = domain_data.get("max_quota_bytes", 0)
|
|
|
|
if max_users > 0 or max_aliases > 0 or max_quota > 0:
|
|
security_analysis["security_features"]["resource_limits"] = {
|
|
"status": "✅ Configured",
|
|
"details": f"Users: {max_users}, Aliases: {max_aliases}, Quota: {max_quota}"
|
|
}
|
|
security_analysis["security_score"] += 15
|
|
else:
|
|
security_analysis["security_features"]["resource_limits"] = {
|
|
"status": "⚠️ No limits set",
|
|
"details": "No resource limits configured"
|
|
}
|
|
security_analysis["recommendations"].append("🚧 Consider setting resource limits for better security")
|
|
|
|
# Check signup policy
|
|
signup_enabled = domain_data.get("signup_enabled", False)
|
|
if signup_enabled:
|
|
security_analysis["security_features"]["signup_policy"] = {
|
|
"status": "⚠️ Open signup enabled",
|
|
"details": "Domain allows open user registration"
|
|
}
|
|
security_analysis["recommendations"].append("🔒 Consider disabling open signup for better security")
|
|
else:
|
|
security_analysis["security_features"]["signup_policy"] = {
|
|
"status": "✅ Restricted signup",
|
|
"details": "Domain has restricted user registration"
|
|
}
|
|
security_analysis["security_score"] += 20
|
|
|
|
# Check for alternative domains
|
|
alternatives = domain_data.get("alternatives", [])
|
|
if alternatives:
|
|
security_analysis["security_features"]["alternative_domains"] = {
|
|
"status": "✅ Configured",
|
|
"details": f"Alternative domains: {', '.join(alternatives)}"
|
|
}
|
|
security_analysis["security_score"] += 10
|
|
else:
|
|
security_analysis["security_features"]["alternative_domains"] = {
|
|
"status": "💡 Optional",
|
|
"details": "No alternative domains configured"
|
|
}
|
|
security_analysis["recommendations"].append("🔄 Consider adding alternative domains for better deliverability")
|
|
|
|
# DNS security recommendations
|
|
security_analysis["recommendations"].extend([
|
|
"📧 Ensure SPF record is configured in DNS",
|
|
"🛡️ Configure DMARC policy in DNS",
|
|
"🔒 Add TLSA records for DANE support",
|
|
"⚙️ Configure autoconfig records for email clients"
|
|
])
|
|
|
|
# Calculate final score
|
|
if security_analysis["security_score"] >= 80:
|
|
score_label = "Excellent"
|
|
elif security_analysis["security_score"] >= 60:
|
|
score_label = "Good"
|
|
elif security_analysis["security_score"] >= 40:
|
|
score_label = "Fair"
|
|
else:
|
|
score_label = "Poor"
|
|
|
|
security_analysis["security_score"] = f"{security_analysis['security_score']}/100 ({score_label})"
|
|
|
|
# Additional recommendations based on score
|
|
if security_analysis["security_score"].startswith("100"):
|
|
security_analysis["recommendations"].append("🎉 Perfect security configuration!")
|
|
elif int(security_analysis["security_score"].split("/")[0]) < 60:
|
|
security_analysis["recommendations"].append("⚠️ Security configuration needs improvement")
|
|
|
|
# Add autoconfig recommendation if not mentioned
|
|
if not any("autoconfig" in rec for rec in security_analysis["recommendations"]):
|
|
security_analysis["recommendations"].append("💡 Consider adding autoconfig records for easier email client setup")
|
|
|
|
return json.dumps(security_analysis, indent=2)
|
|
|
|
except Exception as e:
|
|
return f"Error analyzing domain security: {e}"
|
|
|
|
logger.info("Created comprehensive MCP server with manual tools and resources")
|
|
return mcp
|
|
|
|
|
|
def main():
|
|
"""Main entry point for the MCP server."""
|
|
logger.info("Starting Mailu MCP Server")
|
|
|
|
try:
|
|
# Create and run the MCP server
|
|
mcp = create_mcp_server()
|
|
|
|
mcp.run(
|
|
transport="stdio"
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to start MCP server: {e}")
|
|
raise
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main() |