diff --git a/README.md b/README.md index 170a8a7..f5da367 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ -# Vultr DNS MCP +# Vultr MCP -A comprehensive Model Context Protocol (MCP) server for managing Vultr DNS records through natural language interfaces. +A comprehensive Model Context Protocol (MCP) server for managing Vultr services through natural language interfaces. [![Python 3.10+](https://img.shields.io/badge/python-3.10+-blue.svg)](https://www.python.org/downloads/) [![License: MIT](https://img.shields.io/badge/License-MIT-yellow.svg)](https://opensource.org/licenses/MIT) @@ -8,14 +8,47 @@ A comprehensive Model Context Protocol (MCP) server for managing Vultr DNS recor ## Features -- **Complete MCP Server**: Full Model Context Protocol implementation with 12 tools and 3 resources -- **Comprehensive DNS Management**: Support for all major record types (A, AAAA, CNAME, MX, TXT, NS, SRV) -- **Intelligent Validation**: Pre-creation validation with helpful suggestions and warnings -- **Configuration Analysis**: DNS setup analysis with security recommendations -- **CLI Interface**: Complete command-line tool for direct DNS operations +- **Complete MCP Server**: Full Model Context Protocol implementation with 70+ tools across 8 service modules +- **Comprehensive Service Coverage**: + - **DNS Management**: Full DNS record management (A, AAAA, CNAME, MX, TXT, NS, SRV) + - **Instance Management**: Create, manage, and control compute instances + - **SSH Keys**: Manage SSH keys for secure access + - **Backups**: Create and manage instance backups + - **Firewall**: Configure firewall groups and rules + - **Snapshots**: Create and manage instance snapshots + - **Regions**: Query region availability and capabilities + - **Reserved IPs**: Manage static IP addresses +- **Smart Identifier Resolution**: Use human-readable names instead of UUIDs (e.g., "web-server" instead of UUID) +- **Zone File Import/Export**: Standard zone file format support for bulk DNS operations +- **Intelligent Validation**: Pre-creation validation with helpful suggestions +- **CLI Interface**: Complete command-line tool for direct operations - **High-Level Client**: Convenient Python API for common operations - **Modern Development**: Fast development workflow with uv support +## Smart Identifier Resolution + +One of the key features is **automatic UUID lookup** across all services. Instead of requiring UUIDs, you can use human-readable identifiers: + +- **Instances**: Use label or hostname instead of UUID +- **SSH Keys**: Use key name instead of UUID +- **Firewall Groups**: Use description instead of UUID +- **Snapshots**: Use description instead of UUID +- **Reserved IPs**: Use IP address instead of UUID + +### Examples + +```bash +# Traditional approach (with UUIDs) +mcp-vultr instances stop cb676a46-66fd-4dfb-b839-443f2e6c0b60 +mcp-vultr firewall rules list 5f2a4b6c-7b8d-4e9f-a1b2-3c4d5e6f7a8b + +# Smart approach (with names) +mcp-vultr instances stop web-server +mcp-vultr firewall rules list production-servers +``` + +The system uses **exact matching** to ensure safety - if multiple resources have similar names, you'll get an error rather than operating on the wrong resource. + ## Quick Start ### Installation @@ -34,38 +67,58 @@ pip install mcp-vultr # Set your Vultr API key export VULTR_API_KEY="your-api-key" -# List domains +# DNS Management mcp-vultr domains list - -# List DNS records mcp-vultr records list example.com - -# Set up basic website DNS mcp-vultr setup-website example.com 192.168.1.100 +# Instance Management (with smart name resolution) +mcp-vultr instances list +mcp-vultr instances get web-server # Uses name instead of UUID +mcp-vultr instances stop web-server + +# SSH Key Management +mcp-vultr ssh-keys list +mcp-vultr ssh-keys add "laptop" "ssh-rsa AAAAB3..." + +# Firewall Management +mcp-vultr firewall groups list +mcp-vultr firewall rules list web-servers # Uses description instead of UUID + # Run as MCP server -uv run python -m mcp_vultr.server +vultr-mcp-server ``` ### Python API ```python import asyncio -from mcp_vultr import VultrDNSClient +from mcp_vultr import VultrDNSClient, VultrDNSServer async def main(): - client = VultrDNSClient("your-api-key") + # DNS-specific client + dns_client = VultrDNSClient("your-api-key") # List domains - domains = await client.domains() + domains = await dns_client.domains() # Add DNS records - await client.add_a_record("example.com", "www", "192.168.1.100") - await client.add_mx_record("example.com", "@", "mail.example.com", 10) + await dns_client.add_a_record("example.com", "www", "192.168.1.100") + await dns_client.add_mx_record("example.com", "@", "mail.example.com", 10) # Get domain summary - summary = await client.get_domain_summary("example.com") + summary = await dns_client.get_domain_summary("example.com") print(f"Domain has {summary['total_records']} records") + + # Full API client for all services + vultr = VultrDNSServer("your-api-key") + + # Smart identifier resolution - use names instead of UUIDs! + instance = await vultr.get_instance("web-server") # By label + ssh_key = await vultr.get_ssh_key("laptop-key") # By name + firewall = await vultr.get_firewall_group("production") # By description + snapshot = await vultr.get_snapshot("backup-2024-01") # By description + reserved_ip = await vultr.get_reserved_ip("192.168.1.100") # By IP asyncio.run(main()) ``` @@ -135,19 +188,82 @@ python run_tests.py --all-checks ## MCP Tools Available -| Tool | Description | -|------|-------------| -| `list_dns_domains` | List all DNS domains | -| `get_dns_domain` | Get domain details | -| `create_dns_domain` | Create new domain | -| `delete_dns_domain` | Delete domain and all records | -| `list_dns_records` | List records for a domain | -| `get_dns_record` | Get specific record details | -| `create_dns_record` | Create new DNS record | -| `update_dns_record` | Update existing record | -| `delete_dns_record` | Delete DNS record | -| `validate_dns_record` | Validate record before creation | -| `analyze_dns_records` | Analyze domain configuration | +The MCP server provides 70+ tools across 8 service modules. All tools support **smart identifier resolution** - you can use human-readable names instead of UUIDs! + +### DNS Management (12 tools) +- `dns_list_domains` - List all DNS domains +- `dns_get_domain` - Get domain details +- `dns_create_domain` - Create new domain +- `dns_delete_domain` - Delete domain and all records +- `dns_list_records` - List records for a domain +- `dns_create_record` - Create new DNS record +- `dns_update_record` - Update existing record +- `dns_delete_record` - Delete DNS record +- `dns_validate_record` - Validate record before creation +- `dns_analyze_records` - Analyze domain configuration +- `dns_import_zone_file` - Import DNS records from zone file +- `dns_export_zone_file` - Export DNS records to zone file + +### Instance Management (9 tools) +- `instances_list` - List all instances +- `instances_get` - Get instance details (**smart**: by label, hostname, or UUID) +- `instances_create` - Create new instance +- `instances_update` - Update instance configuration +- `instances_delete` - Delete instance (**smart**: by label, hostname, or UUID) +- `instances_start` - Start a stopped instance (**smart**: by label, hostname, or UUID) +- `instances_stop` - Stop a running instance (**smart**: by label, hostname, or UUID) +- `instances_reboot` - Reboot instance (**smart**: by label, hostname, or UUID) +- `instances_reinstall` - Reinstall instance OS (**smart**: by label, hostname, or UUID) + +### SSH Key Management (5 tools) +- `ssh_keys_list` - List all SSH keys +- `ssh_keys_get` - Get SSH key details (**smart**: by name or UUID) +- `ssh_keys_create` - Add new SSH key +- `ssh_keys_update` - Update SSH key (**smart**: by name or UUID) +- `ssh_keys_delete` - Remove SSH key (**smart**: by name or UUID) + +### Backup Management (2 tools) +- `backups_list` - List all backups +- `backups_get` - Get backup details + +### Firewall Management (10 tools) +- `firewall_list_groups` - List firewall groups +- `firewall_get_group` - Get group details (**smart**: by description or UUID) +- `firewall_create_group` - Create firewall group +- `firewall_update_group` - Update group description (**smart**: by description or UUID) +- `firewall_delete_group` - Delete firewall group (**smart**: by description or UUID) +- `firewall_list_rules` - List rules in a group (**smart**: by description or UUID) +- `firewall_get_rule` - Get specific rule (**smart**: group by description or UUID) +- `firewall_create_rule` - Add firewall rule (**smart**: group by description or UUID) +- `firewall_delete_rule` - Remove firewall rule (**smart**: group by description or UUID) +- `firewall_setup_web_server_rules` - Quick setup for web servers (**smart**: group by description or UUID) + +### Snapshot Management (6 tools) +- `snapshots_list` - List all snapshots +- `snapshots_get` - Get snapshot details (**smart**: by description or UUID) +- `snapshots_create` - Create instance snapshot +- `snapshots_create_from_url` - Create snapshot from URL +- `snapshots_update` - Update snapshot description (**smart**: by description or UUID) +- `snapshots_delete` - Delete snapshot (**smart**: by description or UUID) + +### Region Information (4 tools) +- `regions_list` - List all available regions +- `regions_get_availability` - Check plan availability in region +- `regions_find_regions_with_plan` - Find regions for specific plan +- `regions_list_by_continent` - Filter regions by continent + +### Reserved IP Management (12 tools) +- `reserved_ips_list` - List all reserved IPs +- `reserved_ips_get` - Get reserved IP details (**smart**: by IP address or UUID) +- `reserved_ips_create` - Reserve new IP address +- `reserved_ips_update` - Update reserved IP label (**smart**: by IP address or UUID) +- `reserved_ips_delete` - Release reserved IP (**smart**: by IP address or UUID) +- `reserved_ips_attach` - Attach IP to instance (**smart**: by IP address or UUID) +- `reserved_ips_detach` - Detach IP from instance (**smart**: by IP address or UUID) +- `reserved_ips_convert_instance_ip` - Convert instance IP to reserved +- `reserved_ips_list_by_region` - List IPs in specific region +- `reserved_ips_list_unattached` - List unattached IPs +- `reserved_ips_list_attached` - List attached IPs with instance info ## CLI Commands @@ -162,12 +278,45 @@ mcp-vultr records list example.com mcp-vultr records add example.com A www 192.168.1.100 mcp-vultr records delete example.com record-id +# Zone file operations +mcp-vultr zones export example.com > example.zone +mcp-vultr zones import example.com example.zone + +# Instance management (with smart identifier resolution) +mcp-vultr instances list +mcp-vultr instances create --region ewr --plan vc2-1c-1gb --os 387 --label web-server +mcp-vultr instances get web-server # By label +mcp-vultr instances stop production.local # By hostname +mcp-vultr instances reboot web-server # By label + +# SSH key management (with smart identifier resolution) +mcp-vultr ssh-keys list +mcp-vultr ssh-keys add laptop-key "ssh-rsa AAAAB3..." +mcp-vultr ssh-keys delete laptop-key # By name + +# Firewall management (with smart identifier resolution) +mcp-vultr firewall groups list +mcp-vultr firewall groups create "web-servers" +mcp-vultr firewall rules list web-servers # By description +mcp-vultr firewall rules add web-servers --port 443 --protocol tcp + +# Snapshot management (with smart identifier resolution) +mcp-vultr snapshots list +mcp-vultr snapshots create instance-id --description "backup-2024-01" +mcp-vultr snapshots delete backup-2024-01 # By description + +# Reserved IP management (with smart identifier resolution) +mcp-vultr reserved-ips list +mcp-vultr reserved-ips create --region ewr --type v4 --label production-ip +mcp-vultr reserved-ips attach 192.168.1.100 instance-id # By IP +mcp-vultr reserved-ips delete 192.168.1.100 # By IP + # Setup utilities mcp-vultr setup-website example.com 192.168.1.100 mcp-vultr setup-email example.com mail.example.com # Start MCP server -mcp-vultr server +vultr-mcp-server ``` ## Testing @@ -240,6 +389,44 @@ client = VultrDNSClient("your-api-key") server = create_mcp_server("your-api-key") ``` +## Changelog + +### v1.9.0 (Latest) +- **Feature**: Universal UUID lookup pattern across all modules - use human-readable names everywhere! + - Instances: lookup by label or hostname + - SSH Keys: lookup by name + - Firewall Groups: lookup by description + - Snapshots: lookup by description + - Reserved IPs: lookup by IP address +- **Feature**: All UUID lookups use exact matching for safety +- **Enhancement**: Improved error messages when resources not found + +### v1.8.1 +- **Feature**: Smart identifier resolution for Reserved IPs +- **Fix**: Reserved IP tools now accept IP addresses directly + +### v1.8.0 +- **Feature**: Complete Reserved IP management (12 new tools) +- **Feature**: Support for IPv4 and IPv6 reserved IPs +- **Feature**: Convert existing instance IPs to reserved + +### v1.1.0 +- **Feature**: Zone file import/export functionality +- **Feature**: Standard DNS zone file format support + +### v1.0.1 +- **Major**: Migrated to FastMCP 2.0 framework +- **Feature**: Custom exception hierarchy for better error handling +- **Feature**: Enhanced IPv6 validation with ipaddress module +- **Feature**: HTTP request timeouts (30s total, 10s connect) +- **Feature**: Full uv package manager integration +- **Fix**: Resolved event loop issues with FastMCP + +### v1.0.0 +- Initial release with complete MCP server implementation +- Support for DNS, Instances, SSH Keys, Backups, Firewall, Snapshots, Regions +- CLI interface and Python client library + ## License This project is licensed under the MIT License - see the [LICENSE](LICENSE) file for details. diff --git a/examples.py b/examples.py index 0474d92..92d5626 100644 --- a/examples.py +++ b/examples.py @@ -155,10 +155,10 @@ async def main(): print("\n" + "=" * 50) print("๐Ÿ“š More Information:") - print(" โ€ข Documentation: https://vultr-dns-mcp.readthedocs.io/") - print(" โ€ข PyPI: https://pypi.org/project/vultr-dns-mcp/") - print(" โ€ข CLI Help: vultr-dns-mcp --help") - print(" โ€ข Start MCP Server: vultr-dns-mcp server") + print(" โ€ข Documentation: https://mcp-vultr.readthedocs.io/") + print(" โ€ข PyPI: https://pypi.org/project/mcp-vultr/") + print(" โ€ข CLI Help: mcp-vultr --help") + print(" โ€ข Start MCP Server: mcp-vultr server") if __name__ == "__main__": diff --git a/install_dev.sh b/install_dev.sh index 629110f..c0248a1 100644 --- a/install_dev.sh +++ b/install_dev.sh @@ -1,11 +1,11 @@ #!/bin/bash -# Development installation script for vultr-dns-mcp +# Development installation script for mcp-vultr # This script installs the package in development mode for testing set -e -echo "๐Ÿ”ง Installing vultr-dns-mcp in development mode..." +echo "๐Ÿ”ง Installing mcp-vultr in development mode..." # Change to package directory cd "$(dirname "$0")" @@ -21,8 +21,8 @@ if command -v uv &> /dev/null; then echo "โœ… Installation complete!" echo "" echo "๐Ÿš€ You can now run:" - echo " vultr-dns-mcp --help" - echo " vultr-dns-mcp server" + echo " mcp-vultr --help" + echo " mcp-vultr server" echo "" echo "๐Ÿงช Run tests with:" echo " uv run pytest" @@ -55,8 +55,8 @@ else echo "โœ… Installation complete!" echo "" echo "๐Ÿš€ You can now run:" - echo " vultr-dns-mcp --help" - echo " vultr-dns-mcp server" + echo " mcp-vultr --help" + echo " mcp-vultr server" echo "" echo "๐Ÿงช Run tests with:" echo " pytest" diff --git a/pyproject.toml b/pyproject.toml index dc805e1..03ebfd8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "mcp-vultr" -version = "1.1.0" +version = "1.9.0" description = "A comprehensive Model Context Protocol (MCP) server for managing Vultr DNS records" readme = "README.md" license = {text = "MIT"} @@ -73,11 +73,11 @@ test = [ ] [project.urls] -Homepage = "https://github.com/rsp2k/vultr-dns-mcp" -Documentation = "https://vultr-dns-mcp.readthedocs.io/" -Repository = "https://github.com/rsp2k/vultr-dns-mcp.git" -"Bug Tracker" = "https://github.com/rsp2k/vultr-dns-mcp/issues" -Changelog = "https://github.com/rsp2k/vultr-dns-mcp/blob/main/CHANGELOG.md" +Homepage = "https://github.com/rsp2k/mcp-vultr" +Documentation = "https://mcp-vultr.readthedocs.io/" +Repository = "https://github.com/rsp2k/mcp-vultr.git" +"Bug Tracker" = "https://github.com/rsp2k/mcp-vultr/issues" +Changelog = "https://github.com/rsp2k/mcp-vultr/blob/main/CHANGELOG.md" [project.scripts] mcp-vultr = "mcp_vultr.cli:main" @@ -144,7 +144,7 @@ addopts = [ "--strict-config", "--verbose", "--tb=short", - "--cov=vultr_dns_mcp", + "--cov=mcp_vultr", "--cov-report=term-missing", "--cov-report=html", "--cov-report=xml", diff --git a/src/mcp_vultr/_version.py b/src/mcp_vultr/_version.py index b4de1cd..b855e35 100644 --- a/src/mcp_vultr/_version.py +++ b/src/mcp_vultr/_version.py @@ -1,4 +1,4 @@ -"""Version information for vultr-dns-mcp package.""" +"""Version information for mcp-vultr package.""" -__version__ = "1.1.0" +__version__ = "1.9.0" __version_info__ = tuple(int(i) for i in __version__.split(".") if i.isdigit()) diff --git a/src/mcp_vultr/backups.py b/src/mcp_vultr/backups.py new file mode 100644 index 0000000..9a73c4b --- /dev/null +++ b/src/mcp_vultr/backups.py @@ -0,0 +1,70 @@ +""" +Vultr Backups FastMCP Module. + +This module contains FastMCP tools and resources for managing Vultr backups. +""" + +from typing import List, Dict, Any +from fastmcp import FastMCP + + +def create_backups_mcp(vultr_client) -> FastMCP: + """ + Create a FastMCP instance for Vultr backups management. + + Args: + vultr_client: VultrDNSServer instance + + Returns: + Configured FastMCP instance with backup management tools + """ + mcp = FastMCP(name="vultr-backups") + + # Backup resources + @mcp.resource("backups://list") + async def list_backups_resource() -> List[Dict[str, Any]]: + """List all backups in your Vultr account.""" + return await vultr_client.list_backups() + + @mcp.resource("backups://{backup_id}") + async def get_backup_resource(backup_id: str) -> Dict[str, Any]: + """Get information about a specific backup. + + Args: + backup_id: The backup ID to get information for + """ + return await vultr_client.get_backup(backup_id) + + # Backup tools + @mcp.tool + async def list() -> List[Dict[str, Any]]: + """List all backups in your Vultr account. + + Returns: + List of backup objects with details including: + - id: Backup ID + - date_created: Creation date + - description: Backup description + - size: Size in bytes + - status: Backup status + """ + return await vultr_client.list_backups() + + @mcp.tool + async def get(backup_id: str) -> Dict[str, Any]: + """Get information about a specific backup. + + Args: + backup_id: The backup ID to get information for + + Returns: + Backup information including: + - id: Backup ID + - date_created: Creation date + - description: Backup description + - size: Size in bytes + - status: Backup status + """ + return await vultr_client.get_backup(backup_id) + + return mcp \ No newline at end of file diff --git a/src/mcp_vultr/dns.py b/src/mcp_vultr/dns.py new file mode 100644 index 0000000..4b8fe6e --- /dev/null +++ b/src/mcp_vultr/dns.py @@ -0,0 +1,298 @@ +""" +Vultr DNS FastMCP Module. + +This module contains FastMCP tools and resources for managing Vultr DNS domains and records. +""" + +from typing import Optional, List, Dict, Any +from fastmcp import FastMCP + + +def create_dns_mcp(vultr_client) -> FastMCP: + """ + Create a FastMCP instance for Vultr DNS management. + + Args: + vultr_client: VultrDNSServer instance + + Returns: + Configured FastMCP instance with DNS management tools + """ + mcp = FastMCP(name="vultr-dns") + + # DNS Domain resources + @mcp.resource("domains://list") + async def list_domains_resource() -> List[Dict[str, Any]]: + """List all DNS domains in your Vultr account.""" + return await vultr_client.list_domains() + + @mcp.resource("domains://{domain}") + async def get_domain_resource(domain: str) -> Dict[str, Any]: + """Get details for a specific DNS domain. + + Args: + domain: The domain name to get details for + """ + return await vultr_client.get_domain(domain) + + @mcp.resource("domains://{domain}/records") + async def list_records_resource(domain: str) -> List[Dict[str, Any]]: + """List all DNS records for a domain. + + Args: + domain: The domain name to list records for + """ + return await vultr_client.list_records(domain) + + @mcp.resource("domains://{domain}/records/{record_id}") + async def get_record_resource(domain: str, record_id: str) -> Dict[str, Any]: + """Get details for a specific DNS record. + + Args: + domain: The domain name + record_id: The record ID to get details for + """ + return await vultr_client.get_record(domain, record_id) + + @mcp.resource("domains://{domain}/analysis") + async def analyze_domain_resource(domain: str) -> Dict[str, Any]: + """Analyze DNS records for a domain and provide recommendations. + + Args: + domain: The domain name to analyze + """ + return await vultr_client.analyze_records(domain) + + @mcp.resource("domains://{domain}/zone-file") + async def export_zone_file_resource(domain: str) -> str: + """Export domain records as standard DNS zone file format. + + Args: + domain: The domain name to export + """ + return await vultr_client.export_zone_file(domain) + + # DNS Domain tools + @mcp.tool + async def list_domains() -> List[Dict[str, Any]]: + """List all DNS domains in your Vultr account. + + Returns: + List of domain objects with details including: + - domain: Domain name + - date_created: Creation date + - dns_sec: DNSSEC status + """ + return await vultr_client.list_domains() + + @mcp.tool + async def get_domain(domain: str) -> Dict[str, Any]: + """Get details for a specific DNS domain. + + Args: + domain: The domain name to get details for + + Returns: + Domain details including creation date and DNSSEC status + """ + return await vultr_client.get_domain(domain) + + @mcp.tool + async def create_domain(domain: str, ip: str, dns_sec: str = "disabled") -> Dict[str, Any]: + """Create a new DNS domain. + + Args: + domain: The domain name to create + ip: The default IP address for the domain + dns_sec: Enable DNSSEC (enabled/disabled, default: disabled) + + Returns: + Created domain information + """ + return await vultr_client.create_domain(domain, ip, dns_sec) + + @mcp.tool + async def delete_domain(domain: str) -> Dict[str, str]: + """Delete a DNS domain and all its records. + + Args: + domain: The domain name to delete + + Returns: + Status message confirming deletion + """ + await vultr_client.delete_domain(domain) + return {"status": "success", "message": f"Domain {domain} deleted successfully"} + + # DNS Record tools + @mcp.tool + async def list_records(domain: str) -> List[Dict[str, Any]]: + """List all DNS records for a domain. + + Args: + domain: The domain name to list records for + + Returns: + List of DNS records with details + """ + return await vultr_client.list_records(domain) + + @mcp.tool + async def get_record(domain: str, record_id: str) -> Dict[str, Any]: + """Get details for a specific DNS record. + + Args: + domain: The domain name + record_id: The record ID to get details for + + Returns: + DNS record details + """ + return await vultr_client.get_record(domain, record_id) + + @mcp.tool + async def create_record( + domain: str, + record_type: str, + name: str, + data: str, + ttl: int = 300, + priority: Optional[int] = None + ) -> Dict[str, Any]: + """Create a new DNS record. + + Args: + domain: The domain name + record_type: Record type (A, AAAA, CNAME, MX, TXT, NS, SRV) + name: Record name/subdomain + data: Record data/value + ttl: Time to live in seconds (default: 300) + priority: Priority for MX/SRV records + + Returns: + Created record information + """ + return await vultr_client.create_record(domain, record_type, name, data, ttl, priority) + + @mcp.tool + async def update_record( + domain: str, + record_id: str, + name: Optional[str] = None, + data: Optional[str] = None, + ttl: Optional[int] = None, + priority: Optional[int] = None + ) -> Dict[str, Any]: + """Update an existing DNS record. + + Args: + domain: The domain name + record_id: The record ID to update + name: New record name (optional) + data: New record data (optional) + ttl: New TTL value (optional) + priority: New priority for MX/SRV records (optional) + + Returns: + Updated record information + """ + return await vultr_client.update_record(domain, record_id, name, data, ttl, priority) + + @mcp.tool + async def delete_record(domain: str, record_id: str) -> Dict[str, str]: + """Delete a DNS record. + + Args: + domain: The domain name + record_id: The record ID to delete + + Returns: + Status message confirming deletion + """ + await vultr_client.delete_record(domain, record_id) + return {"status": "success", "message": f"Record {record_id} deleted successfully"} + + @mcp.tool + async def validate_record( + record_type: str, + name: str, + data: str, + ttl: int = 300, + priority: Optional[int] = None + ) -> Dict[str, Any]: + """Validate a DNS record before creation. + + Args: + record_type: Record type (A, AAAA, CNAME, MX, TXT, NS, SRV) + name: Record name/subdomain + data: Record data/value + ttl: Time to live in seconds + priority: Priority for MX/SRV records + + Returns: + Validation results with any errors or warnings + """ + return await vultr_client.validate_record(record_type, name, data, ttl, priority) + + @mcp.tool + async def analyze_domain(domain: str) -> Dict[str, Any]: + """Analyze DNS configuration for a domain and provide recommendations. + + Args: + domain: The domain name to analyze + + Returns: + Analysis results with recommendations for improvements + """ + return await vultr_client.analyze_records(domain) + + @mcp.tool + async def setup_website(domain: str, ip: str, www_enabled: bool = True) -> List[Dict[str, Any]]: + """Set up basic DNS records for a website. + + Args: + domain: The domain name + ip: The website IP address + www_enabled: Whether to create www subdomain record (default: True) + + Returns: + List of created DNS records + """ + records = [] + + # Create A record for domain + records.append(await vultr_client.create_record(domain, "A", "@", ip)) + + # Create www CNAME if enabled + if www_enabled: + records.append(await vultr_client.create_record(domain, "CNAME", "www", domain)) + + return records + + @mcp.tool + async def export_zone_file(domain: str) -> str: + """Export domain records as standard DNS zone file format. + + Args: + domain: The domain name to export + + Returns: + DNS zone file content as string + """ + return await vultr_client.export_zone_file(domain) + + @mcp.tool + async def import_zone_file(domain: str, zone_data: str, dry_run: bool = False) -> List[Dict[str, Any]]: + """Import DNS records from zone file format. + + Args: + domain: The domain name to import records to + zone_data: DNS zone file content as string + dry_run: If True, only validate and return what would be created without making changes + + Returns: + List of created records or validation results + """ + return await vultr_client.import_zone_file(domain, zone_data, dry_run) + + return mcp \ No newline at end of file diff --git a/src/mcp_vultr/fastmcp_server.py b/src/mcp_vultr/fastmcp_server.py index 2afef8d..2f3eaf4 100644 --- a/src/mcp_vultr/fastmcp_server.py +++ b/src/mcp_vultr/fastmcp_server.py @@ -6,9 +6,17 @@ through the Vultr API using the FastMCP framework. """ import os -from typing import Optional, List, Dict, Any +from typing import Optional from fastmcp import FastMCP from .server import VultrDNSServer +from .instances import create_instances_mcp +from .dns import create_dns_mcp +from .ssh_keys import create_ssh_keys_mcp +from .backups import create_backups_mcp +from .firewall import create_firewall_mcp +from .snapshots import create_snapshots_mcp +from .regions import create_regions_mcp +from .reserved_ips import create_reserved_ips_mcp def create_vultr_mcp_server(api_key: Optional[str] = None) -> FastMCP: @@ -29,255 +37,36 @@ def create_vultr_mcp_server(api_key: Optional[str] = None) -> FastMCP: "VULTR_API_KEY must be provided either as parameter or environment variable" ) - # Create FastMCP server - mcp = FastMCP(name="vultr-dns-mcp") + # Create main FastMCP server + mcp = FastMCP(name="mcp-vultr") # Initialize Vultr client vultr_client = VultrDNSServer(api_key) - @mcp.resource("dns://domains") - async def list_dns_domains() -> List[Dict[str, Any]]: - """List all DNS domains in your Vultr account.""" - return await vultr_client.list_domains() + # Mount all modules with appropriate prefixes + dns_mcp = create_dns_mcp(vultr_client) + mcp.mount("dns", dns_mcp) - @mcp.resource("dns://domains/{domain}") - async def get_dns_domain(domain: str) -> Dict[str, Any]: - """Get details for a specific DNS domain. - - Args: - domain: The domain name to get details for - """ - return await vultr_client.get_domain(domain) + instances_mcp = create_instances_mcp(vultr_client) + mcp.mount("instances", instances_mcp) - @mcp.tool - async def create_dns_domain(domain: str, ip: str, dns_sec: str = "disabled") -> Dict[str, Any]: - """Create a new DNS domain. - - Args: - domain: The domain name to create - ip: The default IP address for the domain - dns_sec: Enable DNSSEC (enabled/disabled, default: disabled) - """ - return await vultr_client.create_domain(domain, ip, dns_sec) + ssh_keys_mcp = create_ssh_keys_mcp(vultr_client) + mcp.mount("ssh_keys", ssh_keys_mcp) - @mcp.tool - async def delete_dns_domain(domain: str) -> Dict[str, str]: - """Delete a DNS domain and all its records. - - Args: - domain: The domain name to delete - """ - await vultr_client.delete_domain(domain) - return {"status": "success", "message": f"Domain {domain} deleted successfully"} + backups_mcp = create_backups_mcp(vultr_client) + mcp.mount("backups", backups_mcp) - @mcp.resource("dns://domains/{domain}/records") - async def list_dns_records(domain: str) -> List[Dict[str, Any]]: - """List all DNS records for a domain. - - Args: - domain: The domain name to list records for - """ - return await vultr_client.list_records(domain) + firewall_mcp = create_firewall_mcp(vultr_client) + mcp.mount("firewall", firewall_mcp) - @mcp.resource("dns://domains/{domain}/records/{record_id}") - async def get_dns_record(domain: str, record_id: str) -> Dict[str, Any]: - """Get details for a specific DNS record. - - Args: - domain: The domain name - record_id: The record ID to get details for - """ - return await vultr_client.get_record(domain, record_id) + snapshots_mcp = create_snapshots_mcp(vultr_client) + mcp.mount("snapshots", snapshots_mcp) - @mcp.tool - async def create_dns_record( - domain: str, - record_type: str, - name: str, - data: str, - ttl: int = 300, - priority: Optional[int] = None - ) -> Dict[str, Any]: - """Create a new DNS record. - - Args: - domain: The domain name - record_type: Record type (A, AAAA, CNAME, MX, TXT, NS, SRV) - name: Record name/subdomain - data: Record data/value - ttl: Time to live in seconds (default: 300) - priority: Priority for MX/SRV records - """ - return await vultr_client.create_record(domain, record_type, name, data, ttl, priority) + regions_mcp = create_regions_mcp(vultr_client) + mcp.mount("regions", regions_mcp) - @mcp.tool - async def update_dns_record( - domain: str, - record_id: str, - name: Optional[str] = None, - data: Optional[str] = None, - ttl: Optional[int] = None, - priority: Optional[int] = None - ) -> Dict[str, Any]: - """Update an existing DNS record. - - Args: - domain: The domain name - record_id: The record ID to update - name: New record name (optional) - data: New record data (optional) - ttl: New TTL value (optional) - priority: New priority for MX/SRV records (optional) - """ - return await vultr_client.update_record(domain, record_id, name, data, ttl, priority) - - @mcp.tool - async def delete_dns_record(domain: str, record_id: str) -> Dict[str, str]: - """Delete a DNS record. - - Args: - domain: The domain name - record_id: The record ID to delete - """ - await vultr_client.delete_record(domain, record_id) - return {"status": "success", "message": f"Record {record_id} deleted successfully"} - - @mcp.tool - async def validate_dns_record( - record_type: str, - name: str, - data: str, - ttl: int = 300, - priority: Optional[int] = None - ) -> Dict[str, Any]: - """Validate a DNS record before creation. - - Args: - record_type: Record type (A, AAAA, CNAME, MX, TXT, NS, SRV) - name: Record name/subdomain - data: Record data/value - ttl: Time to live in seconds - priority: Priority for MX/SRV records - """ - return await vultr_client.validate_record(record_type, name, data, ttl, priority) - - @mcp.resource("dns://domains/{domain}/analysis") - async def analyze_dns_records(domain: str) -> Dict[str, Any]: - """Analyze DNS records for a domain and provide recommendations. - - Args: - domain: The domain name to analyze - """ - return await vultr_client.analyze_records(domain) - - @mcp.tool - async def setup_website_dns(domain: str, ip: str, www_enabled: bool = True) -> List[Dict[str, Any]]: - """Set up basic DNS records for a website. - - Args: - domain: The domain name - ip: The website IP address - www_enabled: Whether to create www subdomain record (default: True) - """ - records = [] - - # Create A record for domain - records.append(await vultr_client.create_record(domain, "A", "@", ip)) - - # Create www CNAME if enabled - if www_enabled: - records.append(await vultr_client.create_record(domain, "CNAME", "www", domain)) - - return records - - @mcp.resource("dns://domains/{domain}/zone-file") - async def export_zone_file_resource(domain: str) -> str: - """Export domain records as standard DNS zone file format. - - Args: - domain: The domain name to export - """ - return await vultr_client.export_zone_file(domain) - - # Tool wrappers for resources (for compatibility with Claude Desktop) - @mcp.tool - async def list_domains_tool() -> List[Dict[str, Any]]: - """List all DNS domains in your Vultr account. - - This is a tool wrapper for the dns://domains resource. - """ - return await vultr_client.list_domains() - - @mcp.tool - async def get_domain_tool(domain: str) -> Dict[str, Any]: - """Get details for a specific DNS domain. - - Args: - domain: The domain name to get details for - - This is a tool wrapper for the dns://domains/{domain} resource. - """ - return await vultr_client.get_domain(domain) - - @mcp.tool - async def list_records_tool(domain: str) -> List[Dict[str, Any]]: - """List all DNS records for a domain. - - Args: - domain: The domain name to list records for - - This is a tool wrapper for the dns://domains/{domain}/records resource. - """ - return await vultr_client.list_records(domain) - - @mcp.tool - async def get_record_tool(domain: str, record_id: str) -> Dict[str, Any]: - """Get details for a specific DNS record. - - Args: - domain: The domain name - record_id: The record ID to get details for - - This is a tool wrapper for the dns://domains/{domain}/records/{record_id} resource. - """ - return await vultr_client.get_record(domain, record_id) - - @mcp.tool - async def analyze_domain_tool(domain: str) -> Dict[str, Any]: - """Analyze DNS configuration for a domain and provide recommendations. - - Args: - domain: The domain name to analyze - - This is a tool wrapper for the dns://domains/{domain}/analysis resource. - """ - return await vultr_client.analyze_records(domain) - - @mcp.tool - async def export_zone_file_tool(domain: str) -> str: - """Export domain records as standard DNS zone file format. - - Args: - domain: The domain name to export - - Returns: - DNS zone file content as string - """ - return await vultr_client.export_zone_file(domain) - - @mcp.tool - async def import_zone_file_tool(domain: str, zone_data: str, dry_run: bool = False) -> List[Dict[str, Any]]: - """Import DNS records from zone file format. - - Args: - domain: The domain name to import records to - zone_data: DNS zone file content as string - dry_run: If True, only validate and return what would be created without making changes - - Returns: - List of created records or validation results - """ - return await vultr_client.import_zone_file(domain, zone_data, dry_run) + reserved_ips_mcp = create_reserved_ips_mcp(vultr_client) + mcp.mount("reserved_ips", reserved_ips_mcp) return mcp diff --git a/src/mcp_vultr/firewall.py b/src/mcp_vultr/firewall.py new file mode 100644 index 0000000..0bc7f5f --- /dev/null +++ b/src/mcp_vultr/firewall.py @@ -0,0 +1,295 @@ +""" +Vultr Firewall FastMCP Module. + +This module contains FastMCP tools and resources for managing Vultr firewall groups and rules. +""" + +from typing import Optional, List, Dict, Any +from fastmcp import FastMCP + + +def create_firewall_mcp(vultr_client) -> FastMCP: + """ + Create a FastMCP instance for Vultr firewall management. + + Args: + vultr_client: VultrDNSServer instance + + Returns: + Configured FastMCP instance with firewall management tools + """ + mcp = FastMCP(name="vultr-firewall") + + # Helper function to check if a string looks like a UUID + def is_uuid_format(s: str) -> bool: + """Check if a string looks like a UUID.""" + if len(s) == 36 and s.count('-') == 4: + return True + return False + + # Helper function to get firewall group ID from description + async def get_firewall_group_id(identifier: str) -> str: + """ + Get the firewall group ID from a description or UUID. + + Args: + identifier: Firewall group description or UUID + + Returns: + The firewall group ID (UUID) + + Raises: + ValueError: If the firewall group is not found + """ + # If it looks like a UUID, return it as-is + if is_uuid_format(identifier): + return identifier + + # Otherwise, search for it by description + groups = await vultr_client.list_firewall_groups() + for group in groups: + if group.get("description") == identifier: + return group["id"] + + raise ValueError(f"Firewall group '{identifier}' not found") + + # Firewall Group resources + @mcp.resource("firewall://groups") + async def list_groups_resource() -> List[Dict[str, Any]]: + """List all firewall groups in your Vultr account.""" + return await vultr_client.list_firewall_groups() + + @mcp.resource("firewall://groups/{firewall_group_id}") + async def get_group_resource(firewall_group_id: str) -> Dict[str, Any]: + """Get information about a specific firewall group. + + Args: + firewall_group_id: The firewall group ID or description + """ + actual_id = await get_firewall_group_id(firewall_group_id) + return await vultr_client.get_firewall_group(actual_id) + + @mcp.resource("firewall://groups/{firewall_group_id}/rules") + async def list_rules_resource(firewall_group_id: str) -> List[Dict[str, Any]]: + """List all rules in a firewall group. + + Args: + firewall_group_id: The firewall group ID or description + """ + actual_id = await get_firewall_group_id(firewall_group_id) + return await vultr_client.list_firewall_rules(actual_id) + + @mcp.resource("firewall://groups/{firewall_group_id}/rules/{firewall_rule_id}") + async def get_rule_resource(firewall_group_id: str, firewall_rule_id: str) -> Dict[str, Any]: + """Get information about a specific firewall rule. + + Args: + firewall_group_id: The firewall group ID or description + firewall_rule_id: The firewall rule ID + """ + actual_id = await get_firewall_group_id(firewall_group_id) + return await vultr_client.get_firewall_rule(actual_id, firewall_rule_id) + + # Firewall Group tools + @mcp.tool + async def list_groups() -> List[Dict[str, Any]]: + """List all firewall groups in your Vultr account. + + Returns: + List of firewall group objects with details including: + - id: Firewall group ID + - description: Group description + - date_created: Creation date + - date_modified: Last modification date + - instance_count: Number of instances using this group + - rule_count: Number of rules in this group + - max_rule_count: Maximum allowed rules + """ + return await vultr_client.list_firewall_groups() + + @mcp.tool + async def get_group(firewall_group_id: str) -> Dict[str, Any]: + """Get information about a specific firewall group. + + Args: + firewall_group_id: The firewall group ID or description (e.g., "web-servers" or UUID) + + Returns: + Firewall group information + """ + actual_id = await get_firewall_group_id(firewall_group_id) + return await vultr_client.get_firewall_group(actual_id) + + @mcp.tool + async def create_group(description: str) -> Dict[str, Any]: + """Create a new firewall group. + + Args: + description: Description for the firewall group + + Returns: + Created firewall group information + """ + return await vultr_client.create_firewall_group(description) + + @mcp.tool + async def update_group(firewall_group_id: str, description: str) -> Dict[str, str]: + """Update a firewall group description. + + Args: + firewall_group_id: The firewall group ID or description (e.g., "web-servers" or UUID) + description: New description for the firewall group + + Returns: + Status message confirming update + """ + actual_id = await get_firewall_group_id(firewall_group_id) + await vultr_client.update_firewall_group(actual_id, description) + return {"status": "success", "message": f"Firewall group {firewall_group_id} updated successfully"} + + @mcp.tool + async def delete_group(firewall_group_id: str) -> Dict[str, str]: + """Delete a firewall group. + + Args: + firewall_group_id: The firewall group ID or description (e.g., "web-servers" or UUID) + + Returns: + Status message confirming deletion + """ + actual_id = await get_firewall_group_id(firewall_group_id) + await vultr_client.delete_firewall_group(actual_id) + return {"status": "success", "message": f"Firewall group {firewall_group_id} deleted successfully"} + + # Firewall Rule tools + @mcp.tool + async def list_rules(firewall_group_id: str) -> List[Dict[str, Any]]: + """List all rules in a firewall group. + + Args: + firewall_group_id: The firewall group ID or description (e.g., "web-servers" or UUID) + + Returns: + List of firewall rules with details + """ + actual_id = await get_firewall_group_id(firewall_group_id) + return await vultr_client.list_firewall_rules(actual_id) + + @mcp.tool + async def get_rule(firewall_group_id: str, firewall_rule_id: str) -> Dict[str, Any]: + """Get information about a specific firewall rule. + + Args: + firewall_group_id: The firewall group ID or description (e.g., "web-servers" or UUID) + firewall_rule_id: The firewall rule ID + + Returns: + Firewall rule information + """ + actual_id = await get_firewall_group_id(firewall_group_id) + return await vultr_client.get_firewall_rule(actual_id, firewall_rule_id) + + @mcp.tool + async def create_rule( + firewall_group_id: str, + ip_type: str, + protocol: str, + subnet: str, + subnet_size: int, + port: Optional[str] = None, + source: Optional[str] = None, + notes: Optional[str] = None + ) -> Dict[str, Any]: + """Create a new firewall rule. + + Args: + firewall_group_id: The firewall group ID or description (e.g., "web-servers" or UUID) + ip_type: IP type (v4 or v6) + protocol: Protocol (tcp, udp, icmp, gre) + subnet: IP subnet (use "0.0.0.0" for any IPv4, "::" for any IPv6) + subnet_size: Subnet size (0-32 for IPv4, 0-128 for IPv6) + port: Port or port range (e.g., "80" or "8000:8999") - required for tcp/udp + source: Source type (e.g., "cloudflare") - optional + notes: Notes for the rule - optional + + Returns: + Created firewall rule information + + Examples: + # Allow HTTP from anywhere + create_rule(group_id, "v4", "tcp", "0.0.0.0", 0, port="80") + + # Allow SSH from specific subnet + create_rule(group_id, "v4", "tcp", "192.168.1.0", 24, port="22", notes="Office network") + + # Allow ping from anywhere + create_rule(group_id, "v4", "icmp", "0.0.0.0", 0) + """ + actual_id = await get_firewall_group_id(firewall_group_id) + return await vultr_client.create_firewall_rule( + actual_id, ip_type, protocol, subnet, subnet_size, port, source, notes + ) + + @mcp.tool + async def delete_rule(firewall_group_id: str, firewall_rule_id: str) -> Dict[str, str]: + """Delete a firewall rule. + + Args: + firewall_group_id: The firewall group ID or description (e.g., "web-servers" or UUID) + firewall_rule_id: The firewall rule ID to delete + + Returns: + Status message confirming deletion + """ + actual_id = await get_firewall_group_id(firewall_group_id) + await vultr_client.delete_firewall_rule(actual_id, firewall_rule_id) + return {"status": "success", "message": f"Firewall rule {firewall_rule_id} deleted successfully"} + + @mcp.tool + async def setup_web_server_rules(firewall_group_id: str, allow_ssh_from: str = "0.0.0.0/0") -> List[Dict[str, Any]]: + """Set up common firewall rules for a web server. + + Args: + firewall_group_id: The firewall group ID or description (e.g., "web-servers" or UUID) + allow_ssh_from: IP subnet to allow SSH from (default: anywhere) + + Returns: + List of created firewall rules + + Creates rules for: + - HTTP (port 80) from anywhere + - HTTPS (port 443) from anywhere + - SSH (port 22) from specified subnet + - ICMP (ping) from anywhere + """ + actual_id = await get_firewall_group_id(firewall_group_id) + rules = [] + + # Parse SSH subnet + ssh_parts = allow_ssh_from.split('/') + ssh_subnet = ssh_parts[0] + ssh_size = int(ssh_parts[1]) if len(ssh_parts) > 1 else 0 + + # HTTP + rules.append(await vultr_client.create_firewall_rule( + actual_id, "v4", "tcp", "0.0.0.0", 0, port="80", notes="HTTP" + )) + + # HTTPS + rules.append(await vultr_client.create_firewall_rule( + actual_id, "v4", "tcp", "0.0.0.0", 0, port="443", notes="HTTPS" + )) + + # SSH + rules.append(await vultr_client.create_firewall_rule( + actual_id, "v4", "tcp", ssh_subnet, ssh_size, port="22", notes="SSH" + )) + + # ICMP (ping) + rules.append(await vultr_client.create_firewall_rule( + actual_id, "v4", "icmp", "0.0.0.0", 0, notes="ICMP/Ping" + )) + + return rules + + return mcp \ No newline at end of file diff --git a/src/mcp_vultr/instances.py b/src/mcp_vultr/instances.py new file mode 100644 index 0000000..447123a --- /dev/null +++ b/src/mcp_vultr/instances.py @@ -0,0 +1,379 @@ +""" +Vultr Instances FastMCP Module. + +This module contains FastMCP tools and resources for managing Vultr instances. +""" + +from typing import Optional, List, Dict, Any +from fastmcp import FastMCP + + +def create_instances_mcp(vultr_client) -> FastMCP: + """ + Create a FastMCP instance for Vultr instances management. + + Args: + vultr_client: VultrDNSServer instance + + Returns: + Configured FastMCP instance with instance management tools + """ + mcp = FastMCP(name="vultr-instances") + + # Helper function to check if a string looks like a UUID + def is_uuid_format(s: str) -> bool: + """Check if a string looks like a UUID.""" + if len(s) == 36 and s.count('-') == 4: + return True + return False + + # Helper function to get instance ID from label or hostname + async def get_instance_id(identifier: str) -> str: + """ + Get the instance ID from a label, hostname, or UUID. + + Args: + identifier: Instance label, hostname, or UUID + + Returns: + The instance ID (UUID) + + Raises: + ValueError: If the instance is not found + """ + # If it looks like a UUID, return it as-is + if is_uuid_format(identifier): + return identifier + + # Otherwise, search for it by label or hostname + instances = await vultr_client.list_instances() + for instance in instances: + if instance.get("label") == identifier or instance.get("hostname") == identifier: + return instance["id"] + + raise ValueError(f"Instance '{identifier}' not found (searched by label and hostname)") + + # Instance resources + @mcp.resource("instances://list") + async def list_instances_resource() -> List[Dict[str, Any]]: + """List all instances in your Vultr account.""" + return await vultr_client.list_instances() + + @mcp.resource("instances://{instance_id}") + async def get_instance_resource(instance_id: str) -> Dict[str, Any]: + """Get information about a specific instance. + + Args: + instance_id: The instance ID, label, or hostname + """ + actual_id = await get_instance_id(instance_id) + return await vultr_client.get_instance(actual_id) + + # Instance tools + @mcp.tool + async def list() -> List[Dict[str, Any]]: + """List all instances in your Vultr account. + + Returns: + List of instance objects with details including: + - id: Instance ID + - label: Instance label + - hostname: Instance hostname + - region: Region code + - plan: Plan ID + - os: Operating system + - status: Instance status (active, pending, etc.) + - main_ip: Primary IPv4 address + - v6_main_ip: Primary IPv6 address + - date_created: Creation date + """ + return await vultr_client.list_instances() + + @mcp.tool + async def get(instance_id: str) -> Dict[str, Any]: + """Get detailed information about a specific instance. + + Args: + instance_id: The instance ID, label, or hostname (e.g., "web-server", "db.example.com", or UUID) + + Returns: + Detailed instance information + """ + actual_id = await get_instance_id(instance_id) + return await vultr_client.get_instance(actual_id) + + @mcp.tool + async def create( + region: str, + plan: str, + label: Optional[str] = None, + os_id: Optional[int] = None, + iso_id: Optional[str] = None, + script_id: Optional[str] = None, + snapshot_id: Optional[str] = None, + enable_ipv6: bool = False, + enable_private_network: bool = False, + attach_private_network: Optional[List[str]] = None, + ssh_key_ids: Optional[List[str]] = None, + backups: bool = False, + app_id: Optional[int] = None, + user_data: Optional[str] = None, + ddos_protection: bool = False, + activation_email: bool = False, + hostname: Optional[str] = None, + tag: Optional[str] = None, + firewall_group_id: Optional[str] = None, + reserved_ipv4: Optional[str] = None + ) -> Dict[str, Any]: + """Create a new instance. + + Args: + region: Region code (e.g., 'ewr', 'lax') + plan: Plan ID (e.g., 'vc2-1c-1gb') + label: Label for the instance + os_id: Operating System ID (use list_os to get available options) + iso_id: ISO ID for custom installation + script_id: Startup script ID + snapshot_id: Snapshot ID to restore from + enable_ipv6: Enable IPv6 + enable_private_network: Enable private networking + attach_private_network: List of private network IDs to attach + ssh_key_ids: List of SSH key IDs to install + backups: Enable automatic backups + app_id: Application ID to install + user_data: Cloud-init user data + ddos_protection: Enable DDoS protection + activation_email: Send activation email + hostname: Hostname for the instance + tag: Tag for the instance + firewall_group_id: Firewall group ID + reserved_ipv4: Reserved IPv4 address to use + + Returns: + Created instance information + """ + return await vultr_client.create_instance( + region=region, + plan=plan, + label=label, + os_id=os_id, + iso_id=iso_id, + script_id=script_id, + snapshot_id=snapshot_id, + enable_ipv6=enable_ipv6, + enable_private_network=enable_private_network, + attach_private_network=attach_private_network, + ssh_key_ids=ssh_key_ids, + backups=backups, + app_id=app_id, + user_data=user_data, + ddos_protection=ddos_protection, + activation_email=activation_email, + hostname=hostname, + tag=tag, + firewall_group_id=firewall_group_id, + reserved_ipv4=reserved_ipv4 + ) + + @mcp.tool + async def update( + instance_id: str, + label: Optional[str] = None, + tag: Optional[str] = None, + plan: Optional[str] = None, + enable_ipv6: Optional[bool] = None, + backups: Optional[bool] = None, + ddos_protection: Optional[bool] = None, + firewall_group_id: Optional[str] = None, + user_data: Optional[str] = None + ) -> Dict[str, Any]: + """Update an existing instance. + + Args: + instance_id: The instance ID to update + label: New label for the instance + tag: New tag for the instance + plan: New plan ID (for resizing) + enable_ipv6: Enable/disable IPv6 + backups: Enable/disable automatic backups + ddos_protection: Enable/disable DDoS protection + firewall_group_id: New firewall group ID + user_data: New cloud-init user data + + Returns: + Updated instance information + """ + return await vultr_client.update_instance( + instance_id=instance_id, + label=label, + tag=tag, + plan=plan, + enable_ipv6=enable_ipv6, + backups=backups, + ddos_protection=ddos_protection, + firewall_group_id=firewall_group_id, + user_data=user_data + ) + + @mcp.tool + async def delete(instance_id: str) -> Dict[str, str]: + """Delete an instance. + + Args: + instance_id: The instance ID, label, or hostname (e.g., "web-server", "db.example.com", or UUID) + + Returns: + Status message confirming deletion + """ + actual_id = await get_instance_id(instance_id) + await vultr_client.delete_instance(actual_id) + return {"status": "success", "message": f"Instance {instance_id} deleted successfully"} + + @mcp.tool + async def start(instance_id: str) -> Dict[str, str]: + """Start a stopped instance. + + Args: + instance_id: The instance ID, label, or hostname (e.g., "web-server", "db.example.com", or UUID) + + Returns: + Status message confirming start + """ + actual_id = await get_instance_id(instance_id) + await vultr_client.start_instance(actual_id) + return {"status": "success", "message": f"Instance {instance_id} started successfully"} + + @mcp.tool + async def stop(instance_id: str) -> Dict[str, str]: + """Stop a running instance. + + Args: + instance_id: The instance ID, label, or hostname (e.g., "web-server", "db.example.com", or UUID) + + Returns: + Status message confirming stop + """ + actual_id = await get_instance_id(instance_id) + await vultr_client.stop_instance(actual_id) + return {"status": "success", "message": f"Instance {instance_id} stopped successfully"} + + @mcp.tool + async def reboot(instance_id: str) -> Dict[str, str]: + """Reboot an instance. + + Args: + instance_id: The instance ID, label, or hostname (e.g., "web-server", "db.example.com", or UUID) + + Returns: + Status message confirming reboot + """ + actual_id = await get_instance_id(instance_id) + await vultr_client.reboot_instance(actual_id) + return {"status": "success", "message": f"Instance {instance_id} rebooted successfully"} + + @mcp.tool + async def reinstall(instance_id: str, hostname: Optional[str] = None) -> Dict[str, Any]: + """Reinstall an instance's operating system. + + Args: + instance_id: The instance ID, label, or hostname (e.g., "web-server", "db.example.com", or UUID) + hostname: New hostname for the instance (optional) + + Returns: + Reinstall status information + """ + actual_id = await get_instance_id(instance_id) + return await vultr_client.reinstall_instance(actual_id, hostname) + + # Bandwidth information + @mcp.resource("instances://{instance_id}/bandwidth") + async def get_bandwidth_resource(instance_id: str) -> Dict[str, Any]: + """Get bandwidth usage for an instance. + + Args: + instance_id: The instance ID, label, or hostname + """ + actual_id = await get_instance_id(instance_id) + return await vultr_client.get_instance_bandwidth(actual_id) + + @mcp.tool + async def get_bandwidth(instance_id: str) -> Dict[str, Any]: + """Get bandwidth usage statistics for an instance. + + Args: + instance_id: The instance ID, label, or hostname (e.g., "web-server", "db.example.com", or UUID) + + Returns: + Bandwidth usage information + """ + actual_id = await get_instance_id(instance_id) + return await vultr_client.get_instance_bandwidth(actual_id) + + # IPv4 management + @mcp.tool + async def list_ipv4(instance_id: str) -> List[Dict[str, Any]]: + """List IPv4 addresses for an instance. + + Args: + instance_id: The instance ID, label, or hostname (e.g., "web-server", "db.example.com", or UUID) + + Returns: + List of IPv4 addresses + """ + actual_id = await get_instance_id(instance_id) + return await vultr_client.list_instance_ipv4(actual_id) + + @mcp.tool + async def create_ipv4(instance_id: str, reboot: bool = True) -> Dict[str, Any]: + """Create a new IPv4 address for an instance. + + Args: + instance_id: The instance ID, label, or hostname (e.g., "web-server", "db.example.com", or UUID) + reboot: Whether to reboot the instance (default: True) + + Returns: + Created IPv4 information + """ + actual_id = await get_instance_id(instance_id) + return await vultr_client.create_instance_ipv4(actual_id, reboot) + + @mcp.tool + async def delete_ipv4(instance_id: str, ipv4: str) -> Dict[str, str]: + """Delete an IPv4 address from an instance. + + Args: + instance_id: The instance ID, label, or hostname (e.g., "web-server", "db.example.com", or UUID) + ipv4: The IPv4 address to delete + + Returns: + Status message confirming deletion + """ + actual_id = await get_instance_id(instance_id) + await vultr_client.delete_instance_ipv4(actual_id, ipv4) + return {"status": "success", "message": f"IPv4 {ipv4} deleted successfully"} + + # IPv6 management + @mcp.resource("instances://{instance_id}/ipv6") + async def list_ipv6_resource(instance_id: str) -> List[Dict[str, Any]]: + """List IPv6 addresses for an instance. + + Args: + instance_id: The instance ID, label, or hostname + """ + actual_id = await get_instance_id(instance_id) + return await vultr_client.list_instance_ipv6(actual_id) + + @mcp.tool + async def list_ipv6(instance_id: str) -> List[Dict[str, Any]]: + """List IPv6 addresses for an instance. + + Args: + instance_id: The instance ID, label, or hostname (e.g., "web-server", "db.example.com", or UUID) + + Returns: + List of IPv6 addresses + """ + actual_id = await get_instance_id(instance_id) + return await vultr_client.list_instance_ipv6(actual_id) + + return mcp \ No newline at end of file diff --git a/src/mcp_vultr/regions.py b/src/mcp_vultr/regions.py new file mode 100644 index 0000000..7da3ee6 --- /dev/null +++ b/src/mcp_vultr/regions.py @@ -0,0 +1,116 @@ +""" +Vultr Regions FastMCP Module. + +This module contains FastMCP tools and resources for retrieving Vultr region information. +""" + +from typing import List, Dict, Any +from fastmcp import FastMCP + + +def create_regions_mcp(vultr_client) -> FastMCP: + """ + Create a FastMCP instance for Vultr regions information. + + Args: + vultr_client: VultrDNSServer instance + + Returns: + Configured FastMCP instance with region information tools + """ + mcp = FastMCP(name="vultr-regions") + + # Region resources + @mcp.resource("regions://list") + async def list_regions_resource() -> List[Dict[str, Any]]: + """List all available Vultr regions.""" + return await vultr_client.list_regions() + + @mcp.resource("regions://{region_id}/availability") + async def get_availability_resource(region_id: str) -> Dict[str, Any]: + """Get availability information for a specific region. + + Args: + region_id: The region ID to check availability for + """ + return await vultr_client.list_availability(region_id) + + # Region tools + @mcp.tool + async def list() -> List[Dict[str, Any]]: + """List all available Vultr regions. + + Returns: + List of region objects with details including: + - id: Region ID (e.g., "ewr", "lax", "nrt") + - city: City name + - country: Country code + - continent: Continent name + - options: Available options (e.g., ["ddos_protection"]) + """ + return await vultr_client.list_regions() + + @mcp.tool + async def get_availability(region_id: str) -> Dict[str, Any]: + """Get availability information for a specific region. + + Args: + region_id: The region ID to check availability for (e.g., "ewr", "lax") + + Returns: + Availability information including: + - available_plans: List of available plan IDs in this region + + This is useful for checking which instance plans are available + in a specific region before creating instances. + """ + return await vultr_client.list_availability(region_id) + + @mcp.tool + async def find_regions_with_plan(plan_id: str) -> List[Dict[str, Any]]: + """Find all regions where a specific plan is available. + + Args: + plan_id: The plan ID to search for (e.g., "vc2-1c-1gb") + + Returns: + List of regions where the plan is available, with region details + """ + all_regions = await vultr_client.list_regions() + available_regions = [] + + for region in all_regions: + try: + availability = await vultr_client.list_availability(region["id"]) + if plan_id in availability.get("available_plans", []): + available_regions.append(region) + except Exception: + # Skip regions that might have availability check issues + continue + + return available_regions + + @mcp.tool + async def list_by_continent(continent: str) -> List[Dict[str, Any]]: + """List all regions in a specific continent. + + Args: + continent: Continent name (e.g., "North America", "Europe", "Asia", "Australia") + + Returns: + List of regions in the specified continent + """ + all_regions = await vultr_client.list_regions() + return [r for r in all_regions if r.get("continent", "").lower() == continent.lower()] + + @mcp.tool + async def list_with_ddos_protection() -> List[Dict[str, Any]]: + """List all regions that support DDoS protection. + + Returns: + List of regions with DDoS protection capability + """ + all_regions = await vultr_client.list_regions() + return [r for r in all_regions if "ddos_protection" in r.get("options", [])] + + return mcp \ No newline at end of file diff --git a/src/mcp_vultr/reserved_ips.py b/src/mcp_vultr/reserved_ips.py new file mode 100644 index 0000000..e851d4d --- /dev/null +++ b/src/mcp_vultr/reserved_ips.py @@ -0,0 +1,253 @@ +""" +Vultr Reserved IPs FastMCP Module. + +This module contains FastMCP tools and resources for managing Vultr reserved IPs. +""" + +from typing import List, Dict, Any, Optional +from fastmcp import FastMCP + + +def create_reserved_ips_mcp(vultr_client) -> FastMCP: + """ + Create a FastMCP instance for Vultr reserved IPs management. + + Args: + vultr_client: VultrDNSServer instance + + Returns: + Configured FastMCP instance with reserved IP management tools + """ + mcp = FastMCP(name="vultr-reserved-ips") + + # Helper function to get UUID from IP address + async def get_reserved_ip_uuid(ip_address: str) -> str: + """ + Get the UUID for a reserved IP address. + + Args: + ip_address: The IP address to look up + + Returns: + The UUID of the reserved IP + + Raises: + ValueError: If the IP address is not found + """ + reserved_ips = await vultr_client.list_reserved_ips() + for rip in reserved_ips: + if rip.get("subnet") == ip_address: + return rip["id"] + raise ValueError(f"Reserved IP {ip_address} not found") + + # Reserved IP resources + @mcp.resource("reserved-ips://list") + async def list_reserved_ips_resource() -> List[Dict[str, Any]]: + """List all reserved IPs.""" + return await vultr_client.list_reserved_ips() + + @mcp.resource("reserved-ips://{reserved_ip}") + async def get_reserved_ip_resource(reserved_ip: str) -> Dict[str, Any]: + """Get details of a specific reserved IP. + + Args: + reserved_ip: The reserved IP address + """ + # Try to look up UUID if it looks like an IP address + if "." in reserved_ip or ":" in reserved_ip: + reserved_ip_uuid = await get_reserved_ip_uuid(reserved_ip) + else: + reserved_ip_uuid = reserved_ip + return await vultr_client.get_reserved_ip(reserved_ip_uuid) + + # Reserved IP tools + @mcp.tool + async def list() -> List[Dict[str, Any]]: + """List all reserved IPs in your account. + + Returns: + List of reserved IP objects with details including: + - id: Reserved IP ID + - region: Region ID where IP is reserved + - ip_type: IP type ("v4" or "v6") + - subnet: IP address + - subnet_size: Subnet size + - label: User-defined label + - instance_id: Attached instance ID (if any) + """ + return await vultr_client.list_reserved_ips() + + @mcp.tool + async def get(reserved_ip: str) -> Dict[str, Any]: + """Get details of a specific reserved IP. + + Args: + reserved_ip: The reserved IP address (e.g., "192.168.1.1" or "2001:db8::1") + + Returns: + Reserved IP details including attachment status + """ + # Try to look up UUID if it looks like an IP address + if "." in reserved_ip or ":" in reserved_ip: + reserved_ip_uuid = await get_reserved_ip_uuid(reserved_ip) + else: + reserved_ip_uuid = reserved_ip + return await vultr_client.get_reserved_ip(reserved_ip_uuid) + + @mcp.tool + async def create( + region: str, + ip_type: str = "v4", + label: Optional[str] = None + ) -> Dict[str, Any]: + """Create a new reserved IP in a specific region. + + Args: + region: The region ID where to reserve the IP (e.g., "ewr", "lax") + ip_type: Type of IP to reserve - "v4" for IPv4 or "v6" for IPv6 (default: "v4") + label: Optional label for the reserved IP + + Returns: + Created reserved IP information + + Example: + Create a reserved IPv4 in New Jersey: + create(region="ewr", ip_type="v4", label="web-server-ip") + """ + return await vultr_client.create_reserved_ip(region, ip_type, label) + + @mcp.tool + async def update(reserved_ip: str, label: str) -> str: + """Update a reserved IP's label. + + Args: + reserved_ip: The reserved IP address (e.g., "192.168.1.1" or "2001:db8::1") + label: New label for the reserved IP + + Returns: + Success message + """ + # Try to look up UUID if it looks like an IP address + if "." in reserved_ip or ":" in reserved_ip: + reserved_ip_uuid = await get_reserved_ip_uuid(reserved_ip) + else: + reserved_ip_uuid = reserved_ip + await vultr_client.update_reserved_ip(reserved_ip_uuid, label) + return f"Reserved IP {reserved_ip} label updated to: {label}" + + @mcp.tool + async def delete(reserved_ip: str) -> str: + """Delete a reserved IP. + + Args: + reserved_ip: The reserved IP address to delete (e.g., "192.168.1.1" or "2001:db8::1") + + Returns: + Success message + + Note: The IP must be detached from any instance before deletion. + """ + # Try to look up UUID if it looks like an IP address + if "." in reserved_ip or ":" in reserved_ip: + reserved_ip_uuid = await get_reserved_ip_uuid(reserved_ip) + else: + reserved_ip_uuid = reserved_ip + await vultr_client.delete_reserved_ip(reserved_ip_uuid) + return f"Reserved IP {reserved_ip} deleted successfully" + + @mcp.tool + async def attach(reserved_ip: str, instance_id: str) -> str: + """Attach a reserved IP to an instance. + + Args: + reserved_ip: The reserved IP address (e.g., "192.168.1.1" or "2001:db8::1") + instance_id: The instance ID to attach to + + Returns: + Success message + + Note: The instance must be in the same region as the reserved IP. + """ + # Try to look up UUID if it looks like an IP address + if "." in reserved_ip or ":" in reserved_ip: + reserved_ip_uuid = await get_reserved_ip_uuid(reserved_ip) + else: + reserved_ip_uuid = reserved_ip + await vultr_client.attach_reserved_ip(reserved_ip_uuid, instance_id) + return f"Reserved IP {reserved_ip} attached to instance {instance_id}" + + @mcp.tool + async def detach(reserved_ip: str) -> str: + """Detach a reserved IP from its instance. + + Args: + reserved_ip: The reserved IP address to detach (e.g., "192.168.1.1" or "2001:db8::1") + + Returns: + Success message + """ + # Try to look up UUID if it looks like an IP address + if "." in reserved_ip or ":" in reserved_ip: + reserved_ip_uuid = await get_reserved_ip_uuid(reserved_ip) + else: + reserved_ip_uuid = reserved_ip + await vultr_client.detach_reserved_ip(reserved_ip_uuid) + return f"Reserved IP {reserved_ip} detached from instance" + + @mcp.tool + async def convert_instance_ip( + ip_address: str, + instance_id: str, + label: Optional[str] = None + ) -> Dict[str, Any]: + """Convert an existing instance IP to a reserved IP. + + Args: + ip_address: The IP address to convert + instance_id: The instance ID that owns the IP + label: Optional label for the reserved IP + + Returns: + Created reserved IP information + + This is useful when you want to keep an IP address even after + destroying the instance. The IP will be converted to a reserved IP + and remain attached to the instance. + """ + return await vultr_client.convert_instance_ip_to_reserved(ip_address, instance_id, label) + + @mcp.tool + async def list_by_region(region: str) -> List[Dict[str, Any]]: + """List all reserved IPs in a specific region. + + Args: + region: The region ID to filter by (e.g., "ewr", "lax") + + Returns: + List of reserved IPs in the specified region + """ + all_ips = await vultr_client.list_reserved_ips() + return [ip for ip in all_ips if ip.get("region") == region] + + @mcp.tool + async def list_unattached() -> List[Dict[str, Any]]: + """List all unattached reserved IPs. + + Returns: + List of reserved IPs that are not attached to any instance + """ + all_ips = await vultr_client.list_reserved_ips() + return [ip for ip in all_ips if not ip.get("instance_id")] + + @mcp.tool + async def list_attached() -> List[Dict[str, Any]]: + """List all attached reserved IPs. + + Returns: + List of reserved IPs that are attached to instances, + including the instance ID they're attached to + """ + all_ips = await vultr_client.list_reserved_ips() + return [ip for ip in all_ips if ip.get("instance_id")] + + return mcp \ No newline at end of file diff --git a/src/mcp_vultr/server.py b/src/mcp_vultr/server.py index 73bb64e..c432b31 100644 --- a/src/mcp_vultr/server.py +++ b/src/mcp_vultr/server.py @@ -209,7 +209,7 @@ class VultrDNSServer: # Zone file header lines.append(f"; Zone file for {domain}") - lines.append(f"; Generated by vultr-dns-mcp") + lines.append(f"; Generated by mcp-vultr") lines.append(f"$ORIGIN {domain}.") lines.append(f"$TTL 3600") lines.append("") @@ -437,6 +437,726 @@ class VultrDNSServer: "ttl": ttl, "priority": priority } + + async def list_backups(self) -> List[Dict[str, Any]]: + """ + List all backups in your account. + + Returns: + List of backup objects + """ + result = await self._make_request("GET", "/backups") + return result.get("backups", []) + + async def get_backup(self, backup_id: str) -> Dict[str, Any]: + """ + Get information about a specific backup. + + Args: + backup_id: The backup ID to get information for + + Returns: + Backup information + """ + return await self._make_request("GET", f"/backups/{backup_id}") + + async def list_ssh_keys(self) -> List[Dict[str, Any]]: + """ + List all SSH keys in your account. + + Returns: + List of SSH key objects + """ + result = await self._make_request("GET", "/ssh-keys") + return result.get("ssh_keys", []) + + async def get_ssh_key(self, ssh_key_id: str) -> Dict[str, Any]: + """ + Get information about a specific SSH key. + + Args: + ssh_key_id: The SSH key ID to get information for + + Returns: + SSH key information + """ + return await self._make_request("GET", f"/ssh-keys/{ssh_key_id}") + + async def create_ssh_key(self, name: str, ssh_key: str) -> Dict[str, Any]: + """ + Create a new SSH key. + + Args: + name: Name for the SSH key + ssh_key: The SSH public key + + Returns: + Created SSH key information + """ + data = { + "name": name, + "ssh_key": ssh_key + } + return await self._make_request("POST", "/ssh-keys", data=data) + + async def update_ssh_key(self, ssh_key_id: str, name: Optional[str] = None, ssh_key: Optional[str] = None) -> Dict[str, Any]: + """ + Update an existing SSH key. + + Args: + ssh_key_id: The SSH key ID to update + name: New name for the SSH key (optional) + ssh_key: New SSH public key (optional) + + Returns: + Updated SSH key information + """ + data = {} + if name is not None: + data["name"] = name + if ssh_key is not None: + data["ssh_key"] = ssh_key + + return await self._make_request("PATCH", f"/ssh-keys/{ssh_key_id}", data=data) + + async def delete_ssh_key(self, ssh_key_id: str) -> None: + """ + Delete an SSH key. + + Args: + ssh_key_id: The SSH key ID to delete + """ + await self._make_request("DELETE", f"/ssh-keys/{ssh_key_id}") + + # Instance management methods + async def list_instances(self) -> List[Dict[str, Any]]: + """ + List all instances in your account. + + Returns: + List of instance objects + """ + result = await self._make_request("GET", "/instances") + return result.get("instances", []) + + async def get_instance(self, instance_id: str) -> Dict[str, Any]: + """ + Get information about a specific instance. + + Args: + instance_id: The instance ID to get information for + + Returns: + Instance information + """ + return await self._make_request("GET", f"/instances/{instance_id}") + + async def create_instance( + self, + region: str, + plan: str, + label: Optional[str] = None, + os_id: Optional[int] = None, + iso_id: Optional[str] = None, + script_id: Optional[str] = None, + snapshot_id: Optional[str] = None, + enable_ipv6: bool = False, + enable_private_network: bool = False, + attach_private_network: Optional[List[str]] = None, + ssh_key_ids: Optional[List[str]] = None, + backups: bool = False, + app_id: Optional[int] = None, + user_data: Optional[str] = None, + ddos_protection: bool = False, + activation_email: bool = False, + hostname: Optional[str] = None, + tag: Optional[str] = None, + firewall_group_id: Optional[str] = None, + reserved_ipv4: Optional[str] = None + ) -> Dict[str, Any]: + """ + Create a new instance. + + Args: + region: Region code + plan: Plan ID + label: Label for the instance + os_id: Operating System ID + iso_id: ISO ID for custom installation + script_id: Startup script ID + snapshot_id: Snapshot ID to restore from + enable_ipv6: Enable IPv6 + enable_private_network: Enable private networking + attach_private_network: List of private network IDs to attach + ssh_key_ids: List of SSH key IDs to install + backups: Enable automatic backups + app_id: Application ID to install + user_data: Cloud-init user data + ddos_protection: Enable DDoS protection + activation_email: Send activation email + hostname: Hostname for the instance + tag: Tag for the instance + firewall_group_id: Firewall group ID + reserved_ipv4: Reserved IPv4 address to use + + Returns: + Created instance information + """ + data = { + "region": region, + "plan": plan + } + + # Add optional parameters + if label is not None: + data["label"] = label + if os_id is not None: + data["os_id"] = os_id + if iso_id is not None: + data["iso_id"] = iso_id + if script_id is not None: + data["script_id"] = script_id + if snapshot_id is not None: + data["snapshot_id"] = snapshot_id + if enable_ipv6: + data["enable_ipv6"] = enable_ipv6 + if enable_private_network: + data["enable_private_network"] = enable_private_network + if attach_private_network: + data["attach_private_network"] = attach_private_network + if ssh_key_ids: + data["sshkey_id"] = ssh_key_ids + if backups: + data["backups"] = "enabled" + if app_id is not None: + data["app_id"] = app_id + if user_data is not None: + data["user_data"] = user_data + if ddos_protection: + data["ddos_protection"] = ddos_protection + if activation_email: + data["activation_email"] = activation_email + if hostname is not None: + data["hostname"] = hostname + if tag is not None: + data["tag"] = tag + if firewall_group_id is not None: + data["firewall_group_id"] = firewall_group_id + if reserved_ipv4 is not None: + data["reserved_ipv4"] = reserved_ipv4 + + return await self._make_request("POST", "/instances", data=data) + + async def update_instance( + self, + instance_id: str, + label: Optional[str] = None, + tag: Optional[str] = None, + plan: Optional[str] = None, + enable_ipv6: Optional[bool] = None, + backups: Optional[bool] = None, + ddos_protection: Optional[bool] = None, + firewall_group_id: Optional[str] = None, + user_data: Optional[str] = None + ) -> Dict[str, Any]: + """ + Update an existing instance. + + Args: + instance_id: The instance ID to update + label: New label for the instance + tag: New tag for the instance + plan: New plan ID (for resizing) + enable_ipv6: Enable/disable IPv6 + backups: Enable/disable automatic backups + ddos_protection: Enable/disable DDoS protection + firewall_group_id: New firewall group ID + user_data: New cloud-init user data + + Returns: + Updated instance information + """ + data = {} + if label is not None: + data["label"] = label + if tag is not None: + data["tag"] = tag + if plan is not None: + data["plan"] = plan + if enable_ipv6 is not None: + data["enable_ipv6"] = enable_ipv6 + if backups is not None: + data["backups"] = "enabled" if backups else "disabled" + if ddos_protection is not None: + data["ddos_protection"] = ddos_protection + if firewall_group_id is not None: + data["firewall_group_id"] = firewall_group_id + if user_data is not None: + data["user_data"] = user_data + + return await self._make_request("PATCH", f"/instances/{instance_id}", data=data) + + async def delete_instance(self, instance_id: str) -> None: + """ + Delete an instance. + + Args: + instance_id: The instance ID to delete + """ + await self._make_request("DELETE", f"/instances/{instance_id}") + + async def start_instance(self, instance_id: str) -> None: + """ + Start a stopped instance. + + Args: + instance_id: The instance ID to start + """ + await self._make_request("POST", f"/instances/{instance_id}/start") + + async def stop_instance(self, instance_id: str) -> None: + """ + Stop a running instance. + + Args: + instance_id: The instance ID to stop + """ + await self._make_request("POST", f"/instances/{instance_id}/halt") + + async def reboot_instance(self, instance_id: str) -> None: + """ + Reboot an instance. + + Args: + instance_id: The instance ID to reboot + """ + await self._make_request("POST", f"/instances/{instance_id}/reboot") + + async def reinstall_instance(self, instance_id: str, hostname: Optional[str] = None) -> Dict[str, Any]: + """ + Reinstall an instance's operating system. + + Args: + instance_id: The instance ID to reinstall + hostname: New hostname for the instance + + Returns: + Reinstall status information + """ + data = {} + if hostname is not None: + data["hostname"] = hostname + + return await self._make_request("POST", f"/instances/{instance_id}/reinstall", data=data) + + async def get_instance_bandwidth(self, instance_id: str) -> Dict[str, Any]: + """ + Get bandwidth usage for an instance. + + Args: + instance_id: The instance ID + + Returns: + Bandwidth usage information + """ + return await self._make_request("GET", f"/instances/{instance_id}/bandwidth") + + async def list_instance_ipv4(self, instance_id: str) -> List[Dict[str, Any]]: + """ + List IPv4 addresses for an instance. + + Args: + instance_id: The instance ID + + Returns: + List of IPv4 addresses + """ + result = await self._make_request("GET", f"/instances/{instance_id}/ipv4") + return result.get("ipv4s", []) + + async def create_instance_ipv4(self, instance_id: str, reboot: bool = True) -> Dict[str, Any]: + """ + Create a new IPv4 address for an instance. + + Args: + instance_id: The instance ID + reboot: Whether to reboot the instance + + Returns: + Created IPv4 information + """ + data = {"reboot": reboot} + return await self._make_request("POST", f"/instances/{instance_id}/ipv4", data=data) + + async def delete_instance_ipv4(self, instance_id: str, ipv4: str) -> None: + """ + Delete an IPv4 address from an instance. + + Args: + instance_id: The instance ID + ipv4: The IPv4 address to delete + """ + await self._make_request("DELETE", f"/instances/{instance_id}/ipv4/{ipv4}") + + async def list_instance_ipv6(self, instance_id: str) -> List[Dict[str, Any]]: + """ + List IPv6 addresses for an instance. + + Args: + instance_id: The instance ID + + Returns: + List of IPv6 addresses + """ + result = await self._make_request("GET", f"/instances/{instance_id}/ipv6") + return result.get("ipv6s", []) + + # Firewall management methods + async def list_firewall_groups(self) -> List[Dict[str, Any]]: + """ + List all firewall groups in your account. + + Returns: + List of firewall group objects + """ + result = await self._make_request("GET", "/firewalls") + return result.get("firewall_groups", []) + + async def get_firewall_group(self, firewall_group_id: str) -> Dict[str, Any]: + """ + Get information about a specific firewall group. + + Args: + firewall_group_id: The firewall group ID to get information for + + Returns: + Firewall group information + """ + return await self._make_request("GET", f"/firewalls/{firewall_group_id}") + + async def create_firewall_group(self, description: str) -> Dict[str, Any]: + """ + Create a new firewall group. + + Args: + description: Description for the firewall group + + Returns: + Created firewall group information + """ + data = {"description": description} + return await self._make_request("POST", "/firewalls", data=data) + + async def update_firewall_group(self, firewall_group_id: str, description: str) -> None: + """ + Update a firewall group description. + + Args: + firewall_group_id: The firewall group ID to update + description: New description for the firewall group + """ + data = {"description": description} + await self._make_request("PUT", f"/firewalls/{firewall_group_id}", data=data) + + async def delete_firewall_group(self, firewall_group_id: str) -> None: + """ + Delete a firewall group. + + Args: + firewall_group_id: The firewall group ID to delete + """ + await self._make_request("DELETE", f"/firewalls/{firewall_group_id}") + + async def list_firewall_rules(self, firewall_group_id: str) -> List[Dict[str, Any]]: + """ + List all rules in a firewall group. + + Args: + firewall_group_id: The firewall group ID + + Returns: + List of firewall rules + """ + result = await self._make_request("GET", f"/firewalls/{firewall_group_id}/rules") + return result.get("firewall_rules", []) + + async def get_firewall_rule(self, firewall_group_id: str, firewall_rule_id: str) -> Dict[str, Any]: + """ + Get information about a specific firewall rule. + + Args: + firewall_group_id: The firewall group ID + firewall_rule_id: The firewall rule ID + + Returns: + Firewall rule information + """ + return await self._make_request("GET", f"/firewalls/{firewall_group_id}/rules/{firewall_rule_id}") + + async def create_firewall_rule( + self, + firewall_group_id: str, + ip_type: str, + protocol: str, + subnet: str, + subnet_size: int, + port: Optional[str] = None, + source: Optional[str] = None, + notes: Optional[str] = None + ) -> Dict[str, Any]: + """ + Create a new firewall rule. + + Args: + firewall_group_id: The firewall group ID + ip_type: IP type (v4 or v6) + protocol: Protocol (tcp, udp, icmp, gre) + subnet: IP subnet + subnet_size: Subnet size (0-32 for IPv4, 0-128 for IPv6) + port: Port or port range (e.g., "80" or "8000:8999") + source: Source type (e.g., "cloudflare") + notes: Notes for the rule + + Returns: + Created firewall rule information + """ + data = { + "ip_type": ip_type, + "protocol": protocol, + "subnet": subnet, + "subnet_size": subnet_size + } + + if port is not None: + data["port"] = port + if source is not None: + data["source"] = source + if notes is not None: + data["notes"] = notes + + return await self._make_request("POST", f"/firewalls/{firewall_group_id}/rules", data=data) + + async def delete_firewall_rule(self, firewall_group_id: str, firewall_rule_id: str) -> None: + """ + Delete a firewall rule. + + Args: + firewall_group_id: The firewall group ID + firewall_rule_id: The firewall rule ID to delete + """ + await self._make_request("DELETE", f"/firewalls/{firewall_group_id}/rules/{firewall_rule_id}") + + # Snapshot management methods + async def list_snapshots(self) -> List[Dict[str, Any]]: + """ + List all snapshots in your account. + + Returns: + List of snapshot objects + """ + result = await self._make_request("GET", "/snapshots") + return result.get("snapshots", []) + + async def get_snapshot(self, snapshot_id: str) -> Dict[str, Any]: + """ + Get information about a specific snapshot. + + Args: + snapshot_id: The snapshot ID to get information for + + Returns: + Snapshot information + """ + return await self._make_request("GET", f"/snapshots/{snapshot_id}") + + async def create_snapshot(self, instance_id: str, description: Optional[str] = None) -> Dict[str, Any]: + """ + Create a snapshot from an instance. + + Args: + instance_id: The instance ID to snapshot + description: Description for the snapshot + + Returns: + Created snapshot information + """ + data = {"instance_id": instance_id} + if description is not None: + data["description"] = description + + return await self._make_request("POST", "/snapshots", data=data) + + async def create_snapshot_from_url(self, url: str, description: Optional[str] = None) -> Dict[str, Any]: + """ + Create a snapshot from a URL. + + Args: + url: The URL of the snapshot to create + description: Description for the snapshot + + Returns: + Created snapshot information + """ + data = {"url": url} + if description is not None: + data["description"] = description + + return await self._make_request("POST", "/snapshots/create-from-url", data=data) + + async def update_snapshot(self, snapshot_id: str, description: str) -> None: + """ + Update a snapshot description. + + Args: + snapshot_id: The snapshot ID to update + description: New description for the snapshot + """ + data = {"description": description} + await self._make_request("PUT", f"/snapshots/{snapshot_id}", data=data) + + async def delete_snapshot(self, snapshot_id: str) -> None: + """ + Delete a snapshot. + + Args: + snapshot_id: The snapshot ID to delete + """ + await self._make_request("DELETE", f"/snapshots/{snapshot_id}") + + # Region information methods + async def list_regions(self) -> List[Dict[str, Any]]: + """ + List all available regions. + + Returns: + List of region objects + """ + result = await self._make_request("GET", "/regions") + return result.get("regions", []) + + async def list_availability(self, region_id: str) -> Dict[str, Any]: + """ + Get availability information for a specific region. + + Args: + region_id: The region ID to check availability for + + Returns: + Availability information including available plans + """ + return await self._make_request("GET", f"/regions/{region_id}/availability") + + # Reserved IP Methods + async def list_reserved_ips(self) -> List[Dict[str, Any]]: + """ + List all reserved IPs. + + Returns: + List of reserved IP objects + """ + result = await self._make_request("GET", "/reserved-ips") + return result.get("reserved_ips", []) + + async def get_reserved_ip(self, reserved_ip: str) -> Dict[str, Any]: + """ + Get details of a specific reserved IP. + + Args: + reserved_ip: The reserved IP address + + Returns: + Reserved IP details + """ + return await self._make_request("GET", f"/reserved-ips/{reserved_ip}") + + async def create_reserved_ip( + self, + region: str, + ip_type: str = "v4", + label: Optional[str] = None + ) -> Dict[str, Any]: + """ + Create a new reserved IP. + + Args: + region: The region ID where to reserve the IP + ip_type: Type of IP to reserve ("v4" or "v6") + label: Optional label for the reserved IP + + Returns: + Created reserved IP information + """ + data = { + "region": region, + "ip_type": ip_type + } + if label is not None: + data["label"] = label + + result = await self._make_request("POST", "/reserved-ips", data=data) + return result.get("reserved_ip", {}) + + async def update_reserved_ip(self, reserved_ip: str, label: str) -> None: + """ + Update a reserved IP's label. + + Args: + reserved_ip: The reserved IP address + label: New label for the reserved IP + """ + data = {"label": label} + await self._make_request("PATCH", f"/reserved-ips/{reserved_ip}", data=data) + + async def delete_reserved_ip(self, reserved_ip: str) -> None: + """ + Delete a reserved IP. + + Args: + reserved_ip: The reserved IP address to delete + """ + await self._make_request("DELETE", f"/reserved-ips/{reserved_ip}") + + async def attach_reserved_ip(self, reserved_ip: str, instance_id: str) -> None: + """ + Attach a reserved IP to an instance. + + Args: + reserved_ip: The reserved IP address + instance_id: The instance ID to attach to + """ + data = {"instance_id": instance_id} + await self._make_request("POST", f"/reserved-ips/{reserved_ip}/attach", data=data) + + async def detach_reserved_ip(self, reserved_ip: str) -> None: + """ + Detach a reserved IP from its instance. + + Args: + reserved_ip: The reserved IP address to detach + """ + await self._make_request("POST", f"/reserved-ips/{reserved_ip}/detach") + + async def convert_instance_ip_to_reserved(self, ip_address: str, instance_id: str, label: Optional[str] = None) -> Dict[str, Any]: + """ + Convert an instance IP to a reserved IP. + + Args: + ip_address: The IP address to convert + instance_id: The instance ID that owns the IP + label: Optional label for the reserved IP + + Returns: + Created reserved IP information + """ + data = { + "ip_address": ip_address, + "instance_id": instance_id + } + if label is not None: + data["label"] = label + + result = await self._make_request("POST", "/reserved-ips/convert", data=data) + return result.get("reserved_ip", {}) def create_mcp_server(api_key: Optional[str] = None) -> Server: @@ -461,7 +1181,7 @@ def create_mcp_server(api_key: Optional[str] = None) -> Server: ) # Initialize MCP server - server = Server("vultr-dns-mcp") + server = Server("mcp-vultr") # Initialize Vultr client vultr_client = VultrDNSServer(api_key) diff --git a/src/mcp_vultr/snapshots.py b/src/mcp_vultr/snapshots.py new file mode 100644 index 0000000..7baf7e1 --- /dev/null +++ b/src/mcp_vultr/snapshots.py @@ -0,0 +1,173 @@ +""" +Vultr Snapshots FastMCP Module. + +This module contains FastMCP tools and resources for managing Vultr snapshots. +""" + +from typing import Optional, List, Dict, Any +from fastmcp import FastMCP + + +def create_snapshots_mcp(vultr_client) -> FastMCP: + """ + Create a FastMCP instance for Vultr snapshots management. + + Args: + vultr_client: VultrDNSServer instance + + Returns: + Configured FastMCP instance with snapshot management tools + """ + mcp = FastMCP(name="vultr-snapshots") + + # Helper function to check if a string looks like a UUID + def is_uuid_format(s: str) -> bool: + """Check if a string looks like a UUID.""" + if len(s) == 36 and s.count('-') == 4: + return True + return False + + # Helper function to get snapshot ID from description + async def get_snapshot_id(identifier: str) -> str: + """ + Get the snapshot ID from a description or UUID. + + Args: + identifier: Snapshot description or UUID + + Returns: + The snapshot ID (UUID) + + Raises: + ValueError: If the snapshot is not found + """ + # If it looks like a UUID, return it as-is + if is_uuid_format(identifier): + return identifier + + # Otherwise, search for it by description + snapshots = await vultr_client.list_snapshots() + for snapshot in snapshots: + if snapshot.get("description") == identifier: + return snapshot["id"] + + raise ValueError(f"Snapshot '{identifier}' not found") + + # Snapshot resources + @mcp.resource("snapshots://list") + async def list_snapshots_resource() -> List[Dict[str, Any]]: + """List all snapshots in your Vultr account.""" + return await vultr_client.list_snapshots() + + @mcp.resource("snapshots://{snapshot_id}") + async def get_snapshot_resource(snapshot_id: str) -> Dict[str, Any]: + """Get information about a specific snapshot. + + Args: + snapshot_id: The snapshot ID or description + """ + actual_id = await get_snapshot_id(snapshot_id) + return await vultr_client.get_snapshot(actual_id) + + # Snapshot tools + @mcp.tool + async def list() -> List[Dict[str, Any]]: + """List all snapshots in your Vultr account. + + Returns: + List of snapshot objects with details including: + - id: Snapshot ID + - date_created: Creation date + - description: Snapshot description + - size: Size in bytes + - compressed_size: Compressed size in bytes + - status: Snapshot status + - os_id: Operating system ID + - app_id: Application ID + """ + return await vultr_client.list_snapshots() + + @mcp.tool + async def get(snapshot_id: str) -> Dict[str, Any]: + """Get information about a specific snapshot. + + Args: + snapshot_id: The snapshot ID or description (e.g., "backup-2024-01" or UUID) + + Returns: + Snapshot information including: + - id: Snapshot ID + - date_created: Creation date + - description: Snapshot description + - size: Size in bytes + - compressed_size: Compressed size in bytes + - status: Snapshot status + - os_id: Operating system ID + - app_id: Application ID + """ + actual_id = await get_snapshot_id(snapshot_id) + return await vultr_client.get_snapshot(actual_id) + + @mcp.tool + async def create(instance_id: str, description: Optional[str] = None) -> Dict[str, Any]: + """Create a snapshot from an instance. + + Args: + instance_id: The instance ID to snapshot + description: Description for the snapshot (optional) + + Returns: + Created snapshot information + + Note: Creating a snapshot may take several minutes depending on the instance size. + The snapshot will appear with status 'pending' initially. + """ + return await vultr_client.create_snapshot(instance_id, description) + + @mcp.tool + async def create_from_url(url: str, description: Optional[str] = None) -> Dict[str, Any]: + """Create a snapshot from a URL. + + Args: + url: The URL of the snapshot to create (must be a valid snapshot URL) + description: Description for the snapshot (optional) + + Returns: + Created snapshot information + + Note: The URL must point to a valid Vultr snapshot file. + """ + return await vultr_client.create_snapshot_from_url(url, description) + + @mcp.tool + async def update(snapshot_id: str, description: str) -> Dict[str, str]: + """Update a snapshot description. + + Args: + snapshot_id: The snapshot ID or description (e.g., "backup-2024-01" or UUID) + description: New description for the snapshot + + Returns: + Status message confirming update + """ + actual_id = await get_snapshot_id(snapshot_id) + await vultr_client.update_snapshot(actual_id, description) + return {"status": "success", "message": f"Snapshot {snapshot_id} updated successfully"} + + @mcp.tool + async def delete(snapshot_id: str) -> Dict[str, str]: + """Delete a snapshot. + + Args: + snapshot_id: The snapshot ID or description (e.g., "backup-2024-01" or UUID) + + Returns: + Status message confirming deletion + + Warning: This action cannot be undone! + """ + actual_id = await get_snapshot_id(snapshot_id) + await vultr_client.delete_snapshot(actual_id) + return {"status": "success", "message": f"Snapshot {snapshot_id} deleted successfully"} + + return mcp \ No newline at end of file diff --git a/src/mcp_vultr/ssh_keys.py b/src/mcp_vultr/ssh_keys.py new file mode 100644 index 0000000..af36c2f --- /dev/null +++ b/src/mcp_vultr/ssh_keys.py @@ -0,0 +1,149 @@ +""" +Vultr SSH Keys FastMCP Module. + +This module contains FastMCP tools and resources for managing Vultr SSH keys. +""" + +from typing import Optional, List, Dict, Any +from fastmcp import FastMCP + + +def create_ssh_keys_mcp(vultr_client) -> FastMCP: + """ + Create a FastMCP instance for Vultr SSH keys management. + + Args: + vultr_client: VultrDNSServer instance + + Returns: + Configured FastMCP instance with SSH key management tools + """ + mcp = FastMCP(name="vultr-ssh-keys") + + # Helper function to check if a string looks like a UUID + def is_uuid_format(s: str) -> bool: + """Check if a string looks like a UUID.""" + if len(s) == 36 and s.count('-') == 4: + return True + return False + + # Helper function to get SSH key ID from name + async def get_ssh_key_id(identifier: str) -> str: + """ + Get the SSH key ID from a name or UUID. + + Args: + identifier: SSH key name or UUID + + Returns: + The SSH key ID (UUID) + + Raises: + ValueError: If the SSH key is not found + """ + # If it looks like a UUID, return it as-is + if is_uuid_format(identifier): + return identifier + + # Otherwise, search for it by name + ssh_keys = await vultr_client.list_ssh_keys() + for key in ssh_keys: + if key.get("name") == identifier: + return key["id"] + + raise ValueError(f"SSH key '{identifier}' not found") + + # SSH Key resources + @mcp.resource("ssh-keys://list") + async def list_ssh_keys_resource() -> List[Dict[str, Any]]: + """List all SSH keys in your Vultr account.""" + return await vultr_client.list_ssh_keys() + + @mcp.resource("ssh-keys://{ssh_key_id}") + async def get_ssh_key_resource(ssh_key_id: str) -> Dict[str, Any]: + """Get information about a specific SSH key. + + Args: + ssh_key_id: The SSH key ID or name + """ + actual_id = await get_ssh_key_id(ssh_key_id) + return await vultr_client.get_ssh_key(actual_id) + + # SSH Key tools + @mcp.tool + async def list() -> List[Dict[str, Any]]: + """List all SSH keys in your Vultr account. + + Returns: + List of SSH key objects with details including: + - id: SSH key ID + - name: SSH key name + - ssh_key: The public SSH key + - date_created: Creation date + """ + return await vultr_client.list_ssh_keys() + + @mcp.tool + async def get(ssh_key_id: str) -> Dict[str, Any]: + """Get information about a specific SSH key. + + Args: + ssh_key_id: The SSH key ID or name (e.g., "my-laptop-key" or UUID) + + Returns: + SSH key information including: + - id: SSH key ID + - name: SSH key name + - ssh_key: The public SSH key + - date_created: Creation date + """ + actual_id = await get_ssh_key_id(ssh_key_id) + return await vultr_client.get_ssh_key(actual_id) + + @mcp.tool + async def create(name: str, ssh_key: str) -> Dict[str, Any]: + """Create a new SSH key. + + Args: + name: Name for the SSH key + ssh_key: The SSH public key (e.g., "ssh-rsa AAAAB3NzaC1yc2...") + + Returns: + Created SSH key information including: + - id: SSH key ID + - name: SSH key name + - ssh_key: The public SSH key + - date_created: Creation date + """ + return await vultr_client.create_ssh_key(name, ssh_key) + + @mcp.tool + async def update(ssh_key_id: str, name: Optional[str] = None, ssh_key: Optional[str] = None) -> Dict[str, Any]: + """Update an existing SSH key. + + Args: + ssh_key_id: The SSH key ID or name (e.g., "my-laptop-key" or UUID) + name: New name for the SSH key (optional) + ssh_key: New SSH public key (optional) + + Returns: + Updated SSH key information + """ + actual_id = await get_ssh_key_id(ssh_key_id) + return await vultr_client.update_ssh_key(actual_id, name, ssh_key) + + @mcp.tool + async def delete(ssh_key_id: str) -> Dict[str, str]: + """Delete an SSH key. + + Args: + ssh_key_id: The SSH key ID or name (e.g., "my-laptop-key" or UUID) + + Returns: + Status message confirming deletion + """ + actual_id = await get_ssh_key_id(ssh_key_id) + await vultr_client.delete_ssh_key(actual_id) + return {"status": "success", "message": f"SSH key {ssh_key_id} deleted successfully"} + + return mcp \ No newline at end of file diff --git a/sync_version.py b/sync_version.py index a4f995a..7ac0ef1 100644 --- a/sync_version.py +++ b/sync_version.py @@ -1,6 +1,6 @@ #!/usr/bin/env python3 """ -Version synchronization script for vultr-dns-mcp. +Version synchronization script for mcp-vultr. This script ensures that the version number in pyproject.toml and src/mcp_vultr/_version.py are kept in sync. @@ -58,7 +58,7 @@ def update_version_py(new_version: str) -> None: """Update version in _version.py.""" version_path = Path("src/mcp_vultr/_version.py") - content = f'''"""Version information for vultr-dns-mcp package.""" + content = f'''"""Version information for mcp-vultr package.""" __version__ = "{new_version}" __version_info__ = tuple(int(i) for i in __version__.split(".") if i.isdigit()) diff --git a/test_improvements.py b/test_improvements.py index 6b4b15e..4518aa5 100644 --- a/test_improvements.py +++ b/test_improvements.py @@ -127,7 +127,7 @@ def simulate_domain_query(): print() print("๐Ÿ’ก To query real domains, set VULTR_API_KEY and run:") print(" export VULTR_API_KEY='your-api-key'") - print(" uv run vultr-dns-mcp domains list") + print(" uv run mcp-vultr domains list") print() if __name__ == "__main__": @@ -140,4 +140,4 @@ if __name__ == "__main__": simulate_domain_query() print("โœ… All improvement tests completed successfully!") - print("\n๐ŸŽ‰ Ready to use with: uv run vultr-dns-mcp --help") \ No newline at end of file + print("\n๐ŸŽ‰ Ready to use with: uv run mcp-vultr --help") \ No newline at end of file