diff --git a/src/esxi_mcp_server/mixins/vm_lifecycle.py b/src/esxi_mcp_server/mixins/vm_lifecycle.py index ca4c308..a23122e 100644 --- a/src/esxi_mcp_server/mixins/vm_lifecycle.py +++ b/src/esxi_mcp_server/mixins/vm_lifecycle.py @@ -127,12 +127,16 @@ class VMLifecycleMixin(MCPMixin): if not network_obj: raise ValueError(f"Network '{network}' not found") - # Build VM config spec + # Build VM config spec with required files property + vm_file_info = vim.vm.FileInfo( + vmPathName=f"[{datastore_obj.name}]" + ) vm_spec = vim.vm.ConfigSpec( name=name, memoryMB=memory_mb, numCPUs=cpu, guestId=guest_id, + files=vm_file_info, ) device_specs = [] @@ -157,9 +161,10 @@ class VMLifecycleMixin(MCPMixin): disk_spec.device.backing = vim.vm.device.VirtualDisk.FlatVer2BackingInfo() disk_spec.device.backing.diskMode = "persistent" disk_spec.device.backing.thinProvisioned = True - disk_spec.device.backing.datastore = datastore_obj + disk_spec.device.backing.fileName = f"[{datastore_obj.name}]" disk_spec.device.controllerKey = controller_spec.device.key disk_spec.device.unitNumber = 0 + disk_spec.device.key = -1 # Negative key for new device device_specs.append(disk_spec) # Add network adapter if network is available diff --git a/test_client.py b/test_client.py index 9ceb965..30f921a 100644 --- a/test_client.py +++ b/test_client.py @@ -1,5 +1,5 @@ #!/usr/bin/env python3 -"""Simple MCP client to test the ESXi MCP server.""" +"""Comprehensive MCP client to test all read-only ESXi MCP server tools.""" import asyncio import json @@ -24,9 +24,74 @@ def load_env_file(path: str = ".env") -> dict[str, str]: return env +def print_result(data, indent=2, max_items=5): + """Pretty print result data with truncation.""" + if isinstance(data, list): + # Check for empty-result message + if data and isinstance(data[0], dict) and "message" in data[0] and "count" in data[0]: + print(f" {data[0]['message']}") + return + print(f" Found {len(data)} items:") + for item in data[:max_items]: + if isinstance(item, dict): + summary = ", ".join(f"{k}={v}" for k, v in list(item.items())[:4]) + print(f" - {summary[:100]}...") + else: + print(f" - {item}") + if len(data) > max_items: + print(f" ... and {len(data) - max_items} more") + elif isinstance(data, dict): + for k, v in list(data.items())[:8]: + val_str = str(v)[:60] + "..." if len(str(v)) > 60 else str(v) + print(f" {k}: {val_str}") + else: + print(f" {data}") + + +async def test_tool(session, name: str, args: dict = None, description: str = ""): + """Test a single tool and print results.""" + args = args or {} + print(f"\n{'─' * 60}") + print(f"Testing: {name} {description}") + print(f"{'─' * 60}") + try: + result = await session.call_tool(name, args) + if result.content: + data = json.loads(result.content[0].text) + print_result(data) + return data + else: + print(" No content returned") + return None + except Exception as e: + print(f" ERROR: {e}") + return None + + +async def test_resource(session, uri: str): + """Test reading an MCP resource.""" + print(f"\n{'─' * 60}") + print(f"Resource: {uri}") + print(f"{'─' * 60}") + try: + result = await session.read_resource(uri) + if result.contents: + data = json.loads(result.contents[0].text) + print_result(data) + return data + else: + print(" No content returned") + return None + except Exception as e: + print(f" ERROR: {e}") + return None + + async def main(): - """Test the ESXi MCP server.""" - print("šŸ”Œ Connecting to ESXi MCP server...") + """Test all read-only ESXi MCP server tools.""" + print("=" * 60) + print("ESXi MCP Server - Comprehensive Read-Only Test Suite") + print("=" * 60) # Load from .env file dotenv = load_env_file() @@ -40,260 +105,185 @@ async def main(): "VCENTER_USER": dotenv.get("VCENTER_USER", os.environ.get("VCENTER_USER", "")), "VCENTER_PASSWORD": dotenv.get("VCENTER_PASSWORD", os.environ.get("VCENTER_PASSWORD", "")), "VCENTER_INSECURE": dotenv.get("VCENTER_INSECURE", os.environ.get("VCENTER_INSECURE", "true")), - "MCP_TRANSPORT": "stdio", # Force stdio transport + "MCP_TRANSPORT": "stdio", } ) async with stdio_client(server_params) as (read, write): async with ClientSession(read, write) as session: - # Initialize await session.initialize() - print("āœ“ Connected!\n") + print("\nāœ“ Connected to ESXi MCP Server\n") - # List available tools - print("=== Available Tools ===") + # ───────────────────────────────────────────────────────────── + # List available tools and resources + # ───────────────────────────────────────────────────────────── tools_result = await session.list_tools() - print(f"Total: {len(tools_result.tools)} tools") - - # Group by category (based on tool name patterns) - categories = { - "VM Lifecycle": [], - "Power Ops": [], - "Snapshots": [], - "Guest Ops": [], - "Monitoring": [], - "Datastore": [], - "Disk Mgmt": [], - "NIC Mgmt": [], - "Host Mgmt": [], - "OVF/OVA": [], - "Other": [], - } - - for tool in tools_result.tools: - name = tool.name - if name in ["create_vm", "clone_vm", "delete_vm", "reconfigure_vm", "get_vm_info"]: - categories["VM Lifecycle"].append(name) - elif "power" in name or name in ["reset_vm", "suspend_vm"]: - categories["Power Ops"].append(name) - elif "snapshot" in name: - categories["Snapshots"].append(name) - elif "guest" in name or "execute" in name: - categories["Guest Ops"].append(name) - elif "metrics" in name or "events" in name or "alarms" in name: - categories["Monitoring"].append(name) - elif "datastore" in name or "browse" in name or "upload" in name or "download" in name: - categories["Datastore"].append(name) - elif "disk" in name or "iso" in name: - categories["Disk Mgmt"].append(name) - elif "nic" in name or "mac" in name: - categories["NIC Mgmt"].append(name) - elif "host" in name or "maintenance" in name or "service" in name or "ntp" in name: - categories["Host Mgmt"].append(name) - elif "ovf" in name: - categories["OVF/OVA"].append(name) - else: - categories["Other"].append(name) - - for cat, tools in categories.items(): - if tools: - print(f" {cat}: {len(tools)} tools") - print() - - # List available resources - print("=== Available Resources ===") resources_result = await session.list_resources() - for r in resources_result.resources: - print(f" • {r.uri} ({r.name})") - print() + print(f"Available: {len(tools_result.tools)} tools, {len(resources_result.resources)} resources") - # Test 1: Browse datastore - print("=== Test 1: Browse Datastore ===") - result = await session.call_tool("browse_datastore", { - "datastore": "c1_ds-02", - "path": "iso" - }) - files = json.loads(result.content[0].text) - print(f"Found {len(files)} files in iso/") - for f in files[:5]: - print(f" {f['name']}: {f['size_human']}") - print() + # ───────────────────────────────────────────────────────────── + # Test MCP Resources + # ───────────────────────────────────────────────────────────── + print("\n" + "=" * 60) + print("SECTION 1: MCP Resources") + print("=" * 60) - # Test 2: Read VMs resource - print("=== Test 2: Read Resource (esxi://vms) ===") - try: - result = await session.read_resource("esxi://vms") - vms = json.loads(result.contents[0].text) - print(f"Found {len(vms)} VMs:") - for vm in vms[:5]: - print(f" • {vm['name']}: {vm['power_state']}") - except Exception as e: - print(f"Resource read error: {e}") - print() + vms = await test_resource(session, "esxi://vms") + await test_resource(session, "esxi://hosts") + await test_resource(session, "esxi://datastores") + await test_resource(session, "esxi://networks") + await test_resource(session, "esxi://clusters") - # Test 3: Get Host Info (NEW!) - print("=== Test 3: Get Host Info (NEW!) ===") - try: - result = await session.call_tool("get_host_info", {}) - host_info = json.loads(result.content[0].text) - print(f" Host: {host_info.get('name', 'N/A')}") - prod = host_info.get('product', {}) - print(f" ESXi: {prod.get('version', 'N/A')} (build {prod.get('build', 'N/A')})") - hw = host_info.get('hardware', {}) - print(f" CPU: {hw.get('cpu_cores', 'N/A')} cores @ {hw.get('cpu_mhz', 'N/A')} MHz") - print(f" RAM: {hw.get('memory_gb', 'N/A')} GB") - status = host_info.get('status', {}) - print(f" Maintenance: {status.get('maintenance_mode', 'N/A')}") - except Exception as e: - print(f"Error: {e}") - print() + # Get a VM name for subsequent tests + vm_name = vms[0]["name"] if vms else None + print(f"\n>>> Using VM '{vm_name}' for subsequent tests") - # Test 4: List Services (NEW!) - print("=== Test 4: List Services (NEW!) ===") - try: - result = await session.call_tool("list_services", {}) - services = json.loads(result.content[0].text) - running = [s for s in services if s.get('running')] - print(f"Found {len(services)} services ({len(running)} running):") - for s in running[:8]: - print(f" āœ“ {s['key']}: {s['label']}") - except Exception as e: - print(f"Error: {e}") - print() + # ───────────────────────────────────────────────────────────── + # VM Lifecycle (read-only) + # ───────────────────────────────────────────────────────────── + print("\n" + "=" * 60) + print("SECTION 2: VM Lifecycle Tools") + print("=" * 60) - # Test 5: Get NTP Config (NEW!) - print("=== Test 5: Get NTP Config (NEW!) ===") - try: - result = await session.call_tool("get_ntp_config", {}) - ntp = json.loads(result.content[0].text) - print(f" NTP Servers: {ntp.get('ntp_servers', [])}") - print(f" Service Running: {ntp.get('service_running', 'N/A')}") - print(f" Timezone: {ntp.get('timezone', 'N/A')}") - except Exception as e: - print(f"Error: {e}") - print() + await test_tool(session, "list_vms") + if vm_name: + await test_tool(session, "get_vm_info", {"name": vm_name}) - # Test 6: List VM Disks (if there are VMs) - print("=== Test 6: List VM Disks (NEW!) ===") - try: - # First get a VM name - result = await session.read_resource("esxi://vms") - vms = json.loads(result.contents[0].text) - if vms: - vm_name = vms[0]['name'] - print(f" Checking disks on: {vm_name}") - result = await session.call_tool("list_disks", {"vm_name": vm_name}) - disks = json.loads(result.content[0].text) - for d in disks: - print(f" • {d['label']}: {d.get('size_gb', 'N/A')} GB") - else: - print(" No VMs found to check") - except Exception as e: - print(f"Error: {e}") - print() + # ───────────────────────────────────────────────────────────── + # Monitoring Tools + # ───────────────────────────────────────────────────────────── + print("\n" + "=" * 60) + print("SECTION 3: Monitoring Tools") + print("=" * 60) - # Test 7: List VM NICs (NEW!) - print("=== Test 7: List VM NICs (NEW!) ===") - try: - if vms: - vm_name = vms[0]['name'] - print(f" Checking NICs on: {vm_name}") - result = await session.call_tool("list_nics", {"vm_name": vm_name}) - nics = json.loads(result.content[0].text) - for n in nics: - print(f" • {n['label']}: {n.get('type', 'N/A')} - {n.get('network', 'N/A')}") - print(f" MAC: {n.get('mac_address', 'N/A')}, Connected: {n.get('connected', 'N/A')}") - except Exception as e: - print(f"Error: {e}") - print() + await test_tool(session, "list_hosts") + await test_tool(session, "get_host_stats") + if vm_name: + await test_tool(session, "get_vm_stats", {"name": vm_name}) + await test_tool(session, "get_alarms") + await test_tool(session, "get_recent_events", {"count": 5}) + await test_tool(session, "get_recent_tasks", {"count": 5}) - # ───────────────────────────────────────────────────────────────────────── - # vCenter-Specific Tests (NEW!) - # ───────────────────────────────────────────────────────────────────────── + # ───────────────────────────────────────────────────────────── + # Host Management Tools + # ───────────────────────────────────────────────────────────── + print("\n" + "=" * 60) + print("SECTION 4: Host Management Tools") + print("=" * 60) - # Test 8: List Folders - print("=== Test 8: List Folders (vCenter) ===") - try: - result = await session.call_tool("list_folders", {}) - folders = json.loads(result.content[0].text) - print(f"Found {len(folders)} folders:") - for f in folders[:10]: - print(f" šŸ“ {f['path']} ({f.get('children', 0)} children)") - except Exception as e: - print(f"Error: {e}") - print() + await test_tool(session, "get_host_info") + await test_tool(session, "get_host_hardware") + await test_tool(session, "get_host_networking") + await test_tool(session, "list_services") + await test_tool(session, "get_ntp_config") - # Test 9: List Clusters - print("=== Test 9: List Clusters (vCenter) ===") - try: - result = await session.call_tool("list_clusters", {}) - clusters = json.loads(result.content[0].text) - # Check if it's an empty-result message - if clusters and 'message' in clusters[0]: - print(f" {clusters[0]['message']}") - elif clusters: - print(f"Found {len(clusters)} clusters:") - for c in clusters: - print(f" šŸ–„ļø {c['name']}: {c.get('host_count', 0)} hosts") - print(f" DRS: {c.get('drs', {}).get('enabled', 'N/A')}, HA: {c.get('ha', {}).get('enabled', 'N/A')}") - else: - print(" No clusters found (standalone host mode)") - except Exception as e: - print(f"Error: {e}") - print() + # ───────────────────────────────────────────────────────────── + # Datastore/Resources Tools + # ───────────────────────────────────────────────────────────── + print("\n" + "=" * 60) + print("SECTION 5: Datastore & Resource Tools") + print("=" * 60) - # Test 10: List Recent Tasks - print("=== Test 10: Recent Tasks (vCenter) ===") - try: - result = await session.call_tool("list_recent_tasks", {"max_count": 5}) - tasks = json.loads(result.content[0].text) - # Check if it's an empty-result message - if tasks and 'message' in tasks[0]: - print(f" {tasks[0]['message']}") - else: - print(f"Found {len(tasks)} recent tasks:") - for t in tasks[:5]: - entity = t.get('entity', 'N/A') - print(f" šŸ“‹ {t.get('name', 'N/A')} - {t.get('state', 'N/A')}") - print(f" Entity: {entity} | Started: {t.get('start_time', 'N/A')}") - except Exception as e: - print(f"Error: {e}") - print() + # Get datastore name from resources + ds_result = await session.read_resource("esxi://datastores") + datastores = json.loads(ds_result.contents[0].text) if ds_result.contents else [] + ds_name = datastores[0]["name"] if datastores else None + print(f"\n>>> Using datastore '{ds_name}' for tests") - # Test 11: Get Alarms - print("=== Test 11: Get Alarms (vCenter) ===") - try: - result = await session.call_tool("get_alarms", {}) - alarms = json.loads(result.content[0].text) - if alarms: - print(f"Found {len(alarms)} active alarms:") - for a in alarms[:5]: - print(f" āš ļø {a.get('alarm', 'N/A')} on {a.get('entity', 'N/A')}") - print(f" Status: {a.get('status', 'N/A')}, Acknowledged: {a.get('acknowledged', 'N/A')}") - else: - print(" No active alarms (all clear!)") - except Exception as e: - print(f"Error: {e}") - print() + if ds_name: + await test_tool(session, "get_datastore_info", {"name": ds_name}) + await test_tool(session, "browse_datastore", {"datastore": ds_name, "path": ""}) - # Test 12: List Recent Events - print("=== Test 12: Recent Events (vCenter) ===") - try: - result = await session.call_tool("list_recent_events", {"max_count": 5, "hours_back": 24}) - events = json.loads(result.content[0].text) - print(f"Found {len(events)} events in last 24h:") - for e in events[:5]: - vm_info = f" (VM: {e['vm']})" if e.get('vm') else "" - print(f" šŸ“Œ {e.get('type', 'N/A')}{vm_info}") - msg = e.get('message', 'N/A') - if len(msg) > 80: - msg = msg[:77] + "..." - print(f" {msg}") - except Exception as e: - print(f"Error: {e}") - print() + await test_tool(session, "get_vcenter_info") + await test_tool(session, "get_resource_pool_info") - print("āœ… All tests completed!") + # Get network name + net_result = await session.read_resource("esxi://networks") + networks = json.loads(net_result.contents[0].text) if net_result.contents else [] + net_name = networks[0]["name"] if networks else None + if net_name: + await test_tool(session, "get_network_info", {"name": net_name}) + + await test_tool(session, "list_templates") + + # ───────────────────────────────────────────────────────────── + # Disk Management Tools + # ───────────────────────────────────────────────────────────── + print("\n" + "=" * 60) + print("SECTION 6: Disk Management Tools") + print("=" * 60) + + if vm_name: + await test_tool(session, "list_disks", {"vm_name": vm_name}) + + # ───────────────────────────────────────────────────────────── + # NIC Management Tools + # ───────────────────────────────────────────────────────────── + print("\n" + "=" * 60) + print("SECTION 7: NIC Management Tools") + print("=" * 60) + + if vm_name: + await test_tool(session, "list_nics", {"vm_name": vm_name}) + + # ───────────────────────────────────────────────────────────── + # Snapshot Tools + # ───────────────────────────────────────────────────────────── + print("\n" + "=" * 60) + print("SECTION 8: Snapshot Tools") + print("=" * 60) + + if vm_name: + await test_tool(session, "list_snapshots", {"name": vm_name}) + + # ───────────────────────────────────────────────────────────── + # OVF Tools + # ───────────────────────────────────────────────────────────── + print("\n" + "=" * 60) + print("SECTION 9: OVF Tools") + print("=" * 60) + + await test_tool(session, "list_ovf_networks") + + # ───────────────────────────────────────────────────────────── + # vCenter-Specific Tools + # ───────────────────────────────────────────────────────────── + print("\n" + "=" * 60) + print("SECTION 10: vCenter-Specific Tools") + print("=" * 60) + + await test_tool(session, "list_folders") + await test_tool(session, "list_clusters") + await test_tool(session, "list_recent_tasks", {"max_count": 5}) + await test_tool(session, "list_recent_events", {"max_count": 5, "hours_back": 24}) + + # ───────────────────────────────────────────────────────────── + # Guest Operations (require VMware Tools + credentials) + # ───────────────────────────────────────────────────────────── + print("\n" + "=" * 60) + print("SECTION 11: Guest Operations (may fail without VMware Tools)") + print("=" * 60) + + # These typically need a running VM with VMware Tools + # and guest credentials - expect failures on most VMs + if vm_name: + await test_tool( + session, "list_guest_processes", + {"name": vm_name, "username": "root", "password": "test"}, + "(expected to fail without valid credentials)" + ) + + # ───────────────────────────────────────────────────────────── + # Summary + # ───────────────────────────────────────────────────────────── + print("\n" + "=" * 60) + print("TEST SUMMARY") + print("=" * 60) + print(f"āœ… Read-only test suite completed") + print(f" Tools available: {len(tools_result.tools)}") + print(f" Resources available: {len(resources_result.resources)}") + print(f"\nNote: Guest operations require VMware Tools + valid credentials") + print("Note: Some vCenter tools return empty on standalone hosts") if __name__ == "__main__": diff --git a/test_destructive.py b/test_destructive.py new file mode 100644 index 0000000..77fc691 --- /dev/null +++ b/test_destructive.py @@ -0,0 +1,503 @@ +#!/usr/bin/env python3 +"""Destructive test suite for ESXi MCP server - creates/modifies/deletes resources. + +WARNING: This test creates real VMs and modifies infrastructure! +Only run in a test environment. + +Usage: + python test_destructive.py [--skip-cleanup] + + --skip-cleanup: Leave test VM for inspection (default: cleanup) +""" + +import asyncio +import json +import os +import sys +from datetime import datetime +from pathlib import Path + +from mcp import ClientSession, StdioServerParameters +from mcp.client.stdio import stdio_client + +# Test configuration +TEST_VM_NAME = f"mcp-test-{datetime.now().strftime('%Y%m%d-%H%M%S')}" +TEST_FOLDER_NAME = f"mcp-test-folder-{datetime.now().strftime('%H%M%S')}" +SKIP_CLEANUP = "--skip-cleanup" in sys.argv + + +def load_env_file(path: str = ".env") -> dict[str, str]: + """Load environment variables from a .env file.""" + env = {} + env_path = Path(path) + if env_path.exists(): + with open(env_path) as f: + for line in f: + line = line.strip() + if line and not line.startswith("#") and "=" in line: + key, _, value = line.partition("=") + env[key.strip()] = value.strip() + return env + + +class TestResult: + """Track test results.""" + def __init__(self): + self.passed = 0 + self.failed = 0 + self.skipped = 0 + self.errors = [] + + def record(self, name: str, success: bool, error: str = None): + if success: + self.passed += 1 + print(f" āœ… {name}") + else: + self.failed += 1 + self.errors.append((name, error)) + print(f" āŒ {name}: {error}") + + def skip(self, name: str, reason: str): + self.skipped += 1 + print(f" ā­ļø {name}: {reason}") + + def summary(self): + total = self.passed + self.failed + self.skipped + print(f"\n{'=' * 60}") + print("DESTRUCTIVE TEST SUMMARY") + print(f"{'=' * 60}") + print(f" Passed: {self.passed}/{total}") + print(f" Failed: {self.failed}/{total}") + print(f" Skipped: {self.skipped}/{total}") + if self.errors: + print(f"\nErrors:") + for name, error in self.errors: + print(f" - {name}: {error}") + return self.failed == 0 + + +async def call_tool(session, name: str, args: dict = None) -> tuple[bool, any]: + """Call a tool and return (success, result).""" + args = args or {} + try: + result = await session.call_tool(name, args) + if result.content: + text = result.content[0].text + # Try to parse as JSON, fall back to plain text + try: + data = json.loads(text) + except json.JSONDecodeError: + data = text + return True, data + return True, None + except Exception as e: + return False, str(e) + + +async def main(): + """Run destructive tests.""" + print("=" * 60) + print("ESXi MCP Server - DESTRUCTIVE Test Suite") + print("=" * 60) + print(f"\nāš ļø WARNING: This test will CREATE and MODIFY resources!") + print(f" Test VM: {TEST_VM_NAME}") + print(f" Cleanup: {'DISABLED' if SKIP_CLEANUP else 'ENABLED'}") + print() + + results = TestResult() + dotenv = load_env_file() + + # Get datastore and network from env or use defaults + default_datastore = dotenv.get("VCENTER_DATASTORE", "datastore1") + default_network = dotenv.get("VCENTER_NETWORK", "VM Network") + + server_params = StdioServerParameters( + command="uv", + args=["run", "esxi-mcp-server"], + env={ + **os.environ, + "VCENTER_HOST": dotenv.get("VCENTER_HOST", ""), + "VCENTER_USER": dotenv.get("VCENTER_USER", ""), + "VCENTER_PASSWORD": dotenv.get("VCENTER_PASSWORD", ""), + "VCENTER_INSECURE": dotenv.get("VCENTER_INSECURE", "true"), + "MCP_TRANSPORT": "stdio", + } + ) + + async with stdio_client(server_params) as (read, write): + async with ClientSession(read, write) as session: + await session.initialize() + print("āœ“ Connected to ESXi MCP Server\n") + + # Get available datastores and networks for test + ds_result = await session.read_resource("esxi://datastores") + datastores = json.loads(ds_result.contents[0].text) if ds_result.contents else [] + datastore = datastores[0]["name"] if datastores else default_datastore + + net_result = await session.read_resource("esxi://networks") + networks = json.loads(net_result.contents[0].text) if net_result.contents else [] + network = networks[0]["name"] if networks else default_network + + print(f"Using datastore: {datastore}") + print(f"Using network: {network}") + + # ───────────────────────────────────────────────────────────── + # SECTION 1: VM Lifecycle + # ───────────────────────────────────────────────────────────── + print(f"\n{'=' * 60}") + print("SECTION 1: VM Lifecycle") + print(f"{'=' * 60}") + + # Create VM + print(f"\n>>> Creating test VM: {TEST_VM_NAME}") + success, data = await call_tool(session, "create_vm", { + "name": TEST_VM_NAME, + "cpu": 1, + "memory_mb": 512, + "disk_gb": 1, + "guest_id": "otherGuest64", + "datastore": datastore, + "network": network, + }) + results.record("create_vm", success, data if not success else None) + + if not success: + print("\nāŒ Cannot continue without test VM. Aborting.") + results.summary() + return + + # Get VM info + success, data = await call_tool(session, "get_vm_info", {"name": TEST_VM_NAME}) + results.record("get_vm_info (new VM)", success, data if not success else None) + + # Rename VM (and rename back) + new_name = f"{TEST_VM_NAME}-renamed" + success, data = await call_tool(session, "rename_vm", { + "name": TEST_VM_NAME, + "new_name": new_name, + }) + results.record("rename_vm", success, data if not success else None) + + if success: + # Rename back + await call_tool(session, "rename_vm", {"name": new_name, "new_name": TEST_VM_NAME}) + + # Reconfigure VM + success, data = await call_tool(session, "reconfigure_vm", { + "name": TEST_VM_NAME, + "memory_mb": 1024, + }) + results.record("reconfigure_vm (memory)", success, data if not success else None) + + # ───────────────────────────────────────────────────────────── + # SECTION 2: Power Operations + # ───────────────────────────────────────────────────────────── + print(f"\n{'=' * 60}") + print("SECTION 2: Power Operations") + print(f"{'=' * 60}") + + # Power on + success, data = await call_tool(session, "power_on", {"name": TEST_VM_NAME}) + results.record("power_on", success, data if not success else None) + + if success: + # Wait a moment for power state to stabilize + await asyncio.sleep(3) + + # Suspend + success, data = await call_tool(session, "suspend_vm", {"name": TEST_VM_NAME}) + results.record("suspend_vm", success, data if not success else None) + + if success: + await asyncio.sleep(2) + # Power on again to test power off + await call_tool(session, "power_on", {"name": TEST_VM_NAME}) + await asyncio.sleep(2) + + # Power off + success, data = await call_tool(session, "power_off", {"name": TEST_VM_NAME}) + results.record("power_off", success, data if not success else None) + else: + results.skip("suspend_vm", "power_on failed") + results.skip("power_off", "power_on failed") + + # Ensure VM is off for disk operations + await call_tool(session, "power_off", {"name": TEST_VM_NAME}) + await asyncio.sleep(2) + + # ───────────────────────────────────────────────────────────── + # SECTION 3: Disk Management + # ───────────────────────────────────────────────────────────── + print(f"\n{'=' * 60}") + print("SECTION 3: Disk Management") + print(f"{'=' * 60}") + + # Add disk + success, data = await call_tool(session, "add_disk", { + "vm_name": TEST_VM_NAME, + "size_gb": 1, + "thin_provisioned": True, + }) + results.record("add_disk", success, data if not success else None) + + # List disks + success, data = await call_tool(session, "list_disks", {"vm_name": TEST_VM_NAME}) + results.record("list_disks (after add)", success, data if not success else None) + disk_count = len(data) if success and isinstance(data, list) else 0 + + # Extend disk + if disk_count > 0: + success, data = await call_tool(session, "extend_disk", { + "vm_name": TEST_VM_NAME, + "disk_label": "Hard disk 1", + "new_size_gb": 2, + }) + results.record("extend_disk", success, data if not success else None) + + # Remove the added disk (Hard disk 2) + if disk_count >= 2: + success, data = await call_tool(session, "remove_disk", { + "vm_name": TEST_VM_NAME, + "disk_label": "Hard disk 2", + }) + results.record("remove_disk", success, data if not success else None) + else: + results.skip("remove_disk", "Not enough disks") + + # ───────────────────────────────────────────────────────────── + # SECTION 4: NIC Management + # ───────────────────────────────────────────────────────────── + print(f"\n{'=' * 60}") + print("SECTION 4: NIC Management") + print(f"{'=' * 60}") + + # Add NIC + success, data = await call_tool(session, "add_nic", { + "vm_name": TEST_VM_NAME, + "network": network, + "nic_type": "vmxnet3", + }) + results.record("add_nic", success, data if not success else None) + + # List NICs + success, data = await call_tool(session, "list_nics", {"vm_name": TEST_VM_NAME}) + results.record("list_nics (after add)", success, data if not success else None) + nic_count = len(data) if success and isinstance(data, list) else 0 + + # Connect/disconnect NIC + if nic_count > 0: + success, data = await call_tool(session, "connect_nic", { + "vm_name": TEST_VM_NAME, + "nic_label": "Network adapter 1", + "connected": False, + }) + results.record("connect_nic (disconnect)", success, data if not success else None) + + # Remove added NIC (Network adapter 2) + if nic_count >= 2: + success, data = await call_tool(session, "remove_nic", { + "vm_name": TEST_VM_NAME, + "nic_label": "Network adapter 2", + }) + results.record("remove_nic", success, data if not success else None) + else: + results.skip("remove_nic", "Not enough NICs") + + # ───────────────────────────────────────────────────────────── + # SECTION 5: Snapshots + # ───────────────────────────────────────────────────────────── + print(f"\n{'=' * 60}") + print("SECTION 5: Snapshots") + print(f"{'=' * 60}") + + # Create snapshot + success, data = await call_tool(session, "create_snapshot", { + "name": TEST_VM_NAME, + "snapshot_name": "test-snapshot-1", + "description": "MCP test snapshot", + }) + results.record("create_snapshot", success, data if not success else None) + + # List snapshots + success, data = await call_tool(session, "list_snapshots", {"name": TEST_VM_NAME}) + results.record("list_snapshots", success, data if not success else None) + + # Rename snapshot + success, data = await call_tool(session, "rename_snapshot", { + "name": TEST_VM_NAME, + "snapshot_name": "test-snapshot-1", + "new_name": "renamed-snapshot", + "new_description": "Renamed by MCP test", + }) + results.record("rename_snapshot", success, data if not success else None) + + # Revert to snapshot + success, data = await call_tool(session, "revert_to_snapshot", { + "name": TEST_VM_NAME, + "snapshot_name": "renamed-snapshot", + }) + results.record("revert_to_snapshot", success, data if not success else None) + + # Delete snapshot + success, data = await call_tool(session, "delete_snapshot", { + "name": TEST_VM_NAME, + "snapshot_name": "renamed-snapshot", + }) + results.record("delete_snapshot", success, data if not success else None) + + # ───────────────────────────────────────────────────────────── + # SECTION 6: Folder Operations (vCenter) + # ───────────────────────────────────────────────────────────── + print(f"\n{'=' * 60}") + print("SECTION 6: Folder Operations (vCenter)") + print(f"{'=' * 60}") + + # Create folder + success, data = await call_tool(session, "create_folder", { + "folder_name": TEST_FOLDER_NAME, + }) + results.record("create_folder", success, data if not success else None) + folder_created = success + + # Move VM to folder + if folder_created: + success, data = await call_tool(session, "move_vm_to_folder", { + "vm_name": TEST_VM_NAME, + "folder_path": f"vm/{TEST_FOLDER_NAME}", + }) + results.record("move_vm_to_folder", success, data if not success else None) + + # Move back to root for cleanup + if success: + await call_tool(session, "move_vm_to_folder", { + "vm_name": TEST_VM_NAME, + "folder_path": "vm", + }) + else: + results.skip("move_vm_to_folder", "folder creation failed") + + # ───────────────────────────────────────────────────────────── + # SECTION 7: Datastore Operations + # ───────────────────────────────────────────────────────────── + print(f"\n{'=' * 60}") + print("SECTION 7: Datastore Operations") + print(f"{'=' * 60}") + + # Create folder in datastore + test_ds_folder = f"mcp-test-{datetime.now().strftime('%H%M%S')}" + success, data = await call_tool(session, "create_datastore_folder", { + "datastore": datastore, + "path": test_ds_folder, + }) + results.record("create_datastore_folder", success, data if not success else None) + ds_folder_created = success + + # Delete datastore folder + if ds_folder_created: + success, data = await call_tool(session, "delete_datastore_file", { + "datastore": datastore, + "path": test_ds_folder, + }) + results.record("delete_datastore_file (folder)", success, data if not success else None) + else: + results.skip("delete_datastore_file", "folder creation failed") + + # ───────────────────────────────────────────────────────────── + # SECTION 8: vCenter Advanced Operations + # ───────────────────────────────────────────────────────────── + print(f"\n{'=' * 60}") + print("SECTION 8: vCenter Advanced Operations") + print(f"{'=' * 60}") + + # Storage vMotion - move VM to different datastore + # Get list of datastores to find a second one + if len(datastores) >= 2: + # Find a different datastore + other_datastore = None + for ds in datastores: + if ds["name"] != datastore: + other_datastore = ds["name"] + break + + if other_datastore: + print(f"\n>>> Storage vMotion: {datastore} → {other_datastore}") + success, data = await call_tool(session, "storage_vmotion", { + "vm_name": TEST_VM_NAME, + "target_datastore": other_datastore, + }) + results.record("storage_vmotion", success, data if not success else None) + + # Move back to original datastore + if success: + await call_tool(session, "storage_vmotion", { + "vm_name": TEST_VM_NAME, + "target_datastore": datastore, + }) + else: + results.skip("storage_vmotion", "No alternate datastore found") + else: + results.skip("storage_vmotion", "Only one datastore available") + + # Convert VM to template + print(f"\n>>> Converting VM to template: {TEST_VM_NAME}") + success, data = await call_tool(session, "convert_to_template", { + "vm_name": TEST_VM_NAME, + }) + results.record("convert_to_template", success, data if not success else None) + is_template = success + + # Deploy from template + deployed_vm_name = f"{TEST_VM_NAME}-deployed" + if is_template: + success, data = await call_tool(session, "deploy_from_template", { + "template_name": TEST_VM_NAME, + "new_vm_name": deployed_vm_name, + "datastore": datastore, + }) + results.record("deploy_from_template", success, data if not success else None) + deployed_vm_created = success + + # Clean up deployed VM + if deployed_vm_created: + await call_tool(session, "delete_vm", {"name": deployed_vm_name}) + else: + results.skip("deploy_from_template", "template conversion failed") + + # Convert template back to VM + if is_template: + success, data = await call_tool(session, "convert_to_vm", { + "template_name": TEST_VM_NAME, + }) + results.record("convert_to_vm", success, data if not success else None) + else: + results.skip("convert_to_vm", "template conversion failed") + + # ───────────────────────────────────────────────────────────── + # CLEANUP + # ───────────────────────────────────────────────────────────── + print(f"\n{'=' * 60}") + print("CLEANUP") + print(f"{'=' * 60}") + + if SKIP_CLEANUP: + print(f"\nāš ļø Cleanup SKIPPED. Test VM '{TEST_VM_NAME}' remains.") + if folder_created: + print(f" Test folder '{TEST_FOLDER_NAME}' remains.") + else: + # Delete test VM + print(f"\n>>> Deleting test VM: {TEST_VM_NAME}") + success, data = await call_tool(session, "delete_vm", {"name": TEST_VM_NAME}) + results.record("delete_vm (cleanup)", success, data if not success else None) + + # Note: Folder deletion would require empty folder + # In a real scenario, you'd need to handle this + if folder_created: + print(f" Note: Test folder '{TEST_FOLDER_NAME}' may need manual cleanup") + + # Print summary + return results.summary() + + +if __name__ == "__main__": + success = asyncio.run(main()) + sys.exit(0 if success else 1)