Add zone file import/export functionality (v1.1.0)

Major new feature: DNS Zone File Management
- Add export_zone_file() method to export domain records as standard zone files
- Add import_zone_file() method to import records from zone file format
- Add comprehensive zone file parser with $TTL and $ORIGIN support
- Add dry-run mode for import validation without making changes
- Add zone file tools to FastMCP server (export_zone_file_tool, import_zone_file_tool)
- Add dns://domains/{domain}/zone-file resource for MCP clients

Features:
- Standard zone file format compliance (BIND, PowerDNS compatible)
- Support for all DNS record types (A, AAAA, CNAME, MX, TXT, NS, SRV)
- Proper handling of quoted strings and record priorities
- Line-by-line error reporting for invalid zone data
- Backup and migration capabilities

This enables easy DNS configuration backup, restoration, and bulk operations.

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
Ryan Malloy 2025-07-16 19:29:06 -06:00
parent 509423e650
commit 2d866d275d
6 changed files with 312 additions and 3 deletions

View File

@ -5,6 +5,30 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
## [1.1.0] - 2025-01-16
### Added
- **Zone File Import/Export** - Major new feature for DNS backup and migration
- `export_zone_file_tool(domain)` - Export domain records as standard DNS zone file format
- `import_zone_file_tool(domain, zone_data, dry_run)` - Import DNS records from zone file format
- `dns://domains/{domain}/zone-file` resource for zone file access
- Support for all standard DNS record types (A, AAAA, CNAME, MX, TXT, NS, SRV)
- Comprehensive zone file parsing with proper handling of $TTL and $ORIGIN directives
- Dry-run mode for import validation without making changes
- Standard zone file format compliance for interoperability
### Features
- **Backup & Migration**: Easy DNS configuration backup and restoration
- **Bulk Operations**: Import multiple records at once from zone files
- **Validation**: Pre-import validation with detailed error reporting
- **Compatibility**: Standard zone file format works with BIND, PowerDNS, and other DNS servers
### Technical
- Added comprehensive zone file parsing engine with quoted string handling
- Proper record type detection and formatting
- Error handling with line-by-line validation feedback
- Support for both tool and resource access patterns
## [1.0.4] - 2025-01-16
### Fixed

View File

@ -139,12 +139,13 @@ vultr-dns-mcp/
- 12 comprehensive DNS management tools
- **Important**: FastMCP's `run()` method is synchronous, not async. Do not wrap with `asyncio.run()`
### MCP Tools (17 total)
### MCP Tools (19 total)
- Domain management: list, create, delete, get details
- DNS record operations: CRUD for all record types
- Validation: Pre-creation validation with suggestions
- Analysis: Configuration analysis with security recommendations
- Setup utilities: Quick website and email DNS configuration
- **Zone File Management**: Import/export DNS records in standard zone file format
- Resource access tools: Tool wrappers for Claude Desktop compatibility
### Enhanced Error Handling

View File

@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"
[project]
name = "vultr-dns-mcp"
version = "1.0.4"
version = "1.1.0"
description = "A comprehensive Model Context Protocol (MCP) server for managing Vultr DNS records"
readme = "README.md"
license = {text = "MIT"}

View File

@ -1,4 +1,4 @@
"""Version information for vultr-dns-mcp package."""
__version__ = "1.0.4"
__version__ = "1.1.0"
__version_info__ = tuple(int(i) for i in __version__.split(".") if i.isdigit())

View File

@ -190,6 +190,15 @@ def create_vultr_mcp_server(api_key: Optional[str] = None) -> FastMCP:
return records
@mcp.resource("dns://domains/{domain}/zone-file")
async def export_zone_file_resource(domain: str) -> str:
"""Export domain records as standard DNS zone file format.
Args:
domain: The domain name to export
"""
return await vultr_client.export_zone_file(domain)
# Tool wrappers for resources (for compatibility with Claude Desktop)
@mcp.tool
async def list_domains_tool() -> List[Dict[str, Any]]:
@ -244,6 +253,32 @@ def create_vultr_mcp_server(api_key: Optional[str] = None) -> FastMCP:
"""
return await vultr_client.analyze_records(domain)
@mcp.tool
async def export_zone_file_tool(domain: str) -> str:
"""Export domain records as standard DNS zone file format.
Args:
domain: The domain name to export
Returns:
DNS zone file content as string
"""
return await vultr_client.export_zone_file(domain)
@mcp.tool
async def import_zone_file_tool(domain: str, zone_data: str, dry_run: bool = False) -> List[Dict[str, Any]]:
"""Import DNS records from zone file format.
Args:
domain: The domain name to import records to
zone_data: DNS zone file content as string
dry_run: If True, only validate and return what would be created without making changes
Returns:
List of created records or validation results
"""
return await vultr_client.import_zone_file(domain, zone_data, dry_run)
return mcp

View File

@ -189,6 +189,255 @@ class VultrDNSServer:
"""Delete a DNS record."""
return await self._make_request("DELETE", f"/domains/{domain}/records/{record_id}")
# Zone File Management Methods
async def export_zone_file(self, domain: str) -> str:
"""
Export domain records as standard DNS zone file format.
Args:
domain: The domain name to export
Returns:
DNS zone file content as string
"""
# Get domain info and records
domain_info = await self.get_domain(domain)
records = await self.list_records(domain)
# Build zone file content
lines = []
# Zone file header
lines.append(f"; Zone file for {domain}")
lines.append(f"; Generated by vultr-dns-mcp")
lines.append(f"$ORIGIN {domain}.")
lines.append(f"$TTL 3600")
lines.append("")
# Sort records by type for better organization
record_types = ["SOA", "NS", "A", "AAAA", "CNAME", "MX", "TXT", "SRV"]
sorted_records = []
for record_type in record_types:
type_records = [r for r in records if r.get("type") == record_type]
sorted_records.extend(type_records)
# Add any remaining record types not in our list
remaining = [r for r in records if r.get("type") not in record_types]
sorted_records.extend(remaining)
# Convert records to zone file format
for record in sorted_records:
name = record.get("name", "@")
ttl = record.get("ttl", 3600)
record_type = record.get("type")
data = record.get("data", "")
priority = record.get("priority")
# Handle different record types
if record_type == "MX":
line = f"{name}\t{ttl}\tIN\t{record_type}\t{priority}\t{data}"
elif record_type == "SRV":
# SRV format: priority weight port target
srv_parts = data.split()
if len(srv_parts) >= 3:
weight = srv_parts[0] if len(srv_parts) > 3 else "0"
port = srv_parts[-2] if len(srv_parts) > 2 else "80"
target = srv_parts[-1]
line = f"{name}\t{ttl}\tIN\t{record_type}\t{priority}\t{weight}\t{port}\t{target}"
else:
line = f"{name}\t{ttl}\tIN\t{record_type}\t{priority}\t{data}"
elif record_type == "TXT":
# Ensure TXT data is quoted
if not (data.startswith('"') and data.endswith('"')):
data = f'"{data}"'
line = f"{name}\t{ttl}\tIN\t{record_type}\t{data}"
else:
line = f"{name}\t{ttl}\tIN\t{record_type}\t{data}"
lines.append(line)
return "\n".join(lines)
async def import_zone_file(self, domain: str, zone_data: str, dry_run: bool = False) -> List[Dict[str, Any]]:
"""
Import DNS records from zone file format.
Args:
domain: The domain name to import records to
zone_data: DNS zone file content as string
dry_run: If True, only validate and return what would be created
Returns:
List of created records or validation results
"""
results = []
lines = zone_data.strip().split('\n')
current_ttl = 3600
current_origin = domain
for line_num, line in enumerate(lines, 1):
line = line.strip()
# Skip empty lines and comments
if not line or line.startswith(';'):
continue
# Handle $TTL directive
if line.startswith('$TTL'):
try:
current_ttl = int(line.split()[1])
except (IndexError, ValueError):
results.append({
"error": f"Line {line_num}: Invalid $TTL directive",
"line": line
})
continue
# Handle $ORIGIN directive
if line.startswith('$ORIGIN'):
try:
current_origin = line.split()[1].rstrip('.')
except IndexError:
results.append({
"error": f"Line {line_num}: Invalid $ORIGIN directive",
"line": line
})
continue
# Skip SOA records (managed by Vultr)
if '\tSOA\t' in line or ' SOA ' in line:
continue
# Parse DNS record
try:
record = self._parse_zone_line(line, current_ttl, current_origin)
if record:
if dry_run:
results.append({
"action": "create",
"type": record["type"],
"name": record["name"],
"data": record["data"],
"ttl": record.get("ttl"),
"priority": record.get("priority")
})
else:
# Create the record
created = await self.create_record(
domain=domain,
record_type=record["type"],
name=record["name"],
data=record["data"],
ttl=record.get("ttl"),
priority=record.get("priority")
)
results.append(created)
except Exception as e:
results.append({
"error": f"Line {line_num}: {str(e)}",
"line": line
})
return results
def _parse_zone_line(self, line: str, default_ttl: int, origin: str) -> Optional[Dict[str, Any]]:
"""
Parse a single zone file line into a DNS record.
Args:
line: Zone file line to parse
default_ttl: Default TTL if not specified
origin: Current origin domain
Returns:
Dictionary with record data or None if invalid
"""
# Split line into parts, handling quoted strings
parts = []
current_part = ""
in_quotes = False
for char in line:
if char == '"' and (not current_part or current_part[-1] != '\\'):
in_quotes = not in_quotes
current_part += char
elif char in [' ', '\t'] and not in_quotes:
if current_part:
parts.append(current_part)
current_part = ""
else:
current_part += char
if current_part:
parts.append(current_part)
if len(parts) < 4:
return None
# Parse parts: name [ttl] [class] type data [data...]
name = parts[0]
record_type = None
data_start_idx = 0
ttl = default_ttl
# Find the record type (should be one of the standard types)
valid_types = ["A", "AAAA", "CNAME", "MX", "TXT", "NS", "SRV", "PTR"]
for i, part in enumerate(parts[1:], 1):
if part.upper() in valid_types:
record_type = part.upper()
data_start_idx = i + 1
break
elif part.upper() == "IN":
continue # Skip class
elif part.isdigit():
ttl = int(part)
if not record_type or data_start_idx >= len(parts):
return None
# Handle @ symbol for root domain
if name == "@":
name = ""
elif name.endswith("."):
name = name[:-1] # Remove trailing dot
# Get record data
data_parts = parts[data_start_idx:]
priority = None
if record_type == "MX":
if len(data_parts) >= 2:
priority = int(data_parts[0])
data = data_parts[1]
else:
return None
elif record_type == "SRV":
if len(data_parts) >= 4:
priority = int(data_parts[0])
weight = data_parts[1]
port = data_parts[2]
target = data_parts[3]
data = f"{weight} {port} {target}"
else:
return None
elif record_type == "TXT":
# Join all parts and remove quotes
data = " ".join(data_parts)
if data.startswith('"') and data.endswith('"'):
data = data[1:-1]
else:
data = " ".join(data_parts)
return {
"name": name,
"type": record_type,
"data": data,
"ttl": ttl,
"priority": priority
}
def create_mcp_server(api_key: Optional[str] = None) -> Server:
"""