add comprehensive test suites and fix VM creation
- Fix create_vm: add required files property to ConfigSpec
- Fix disk backing: use fileName instead of datastore reference
- test_client.py: comprehensive read-only test suite (86 tools, 6 resources)
- test_destructive.py: destructive test suite covering 29 operations:
- VM lifecycle (create, info, rename, reconfigure)
- Power operations (on, suspend, off)
- Disk management (add, list, extend, remove)
- NIC management (add, list, connect, remove)
- Snapshots (create, list, rename, revert, delete)
- Folder operations (create, move VM)
- Datastore operations (create folder, delete)
- vCenter advanced (storage_vmotion, convert_to_template,
deploy_from_template, convert_to_vm)
This commit is contained in:
parent
b9c411fdf9
commit
7918a78bfa
@ -127,12 +127,16 @@ class VMLifecycleMixin(MCPMixin):
|
|||||||
if not network_obj:
|
if not network_obj:
|
||||||
raise ValueError(f"Network '{network}' not found")
|
raise ValueError(f"Network '{network}' not found")
|
||||||
|
|
||||||
# Build VM config spec
|
# Build VM config spec with required files property
|
||||||
|
vm_file_info = vim.vm.FileInfo(
|
||||||
|
vmPathName=f"[{datastore_obj.name}]"
|
||||||
|
)
|
||||||
vm_spec = vim.vm.ConfigSpec(
|
vm_spec = vim.vm.ConfigSpec(
|
||||||
name=name,
|
name=name,
|
||||||
memoryMB=memory_mb,
|
memoryMB=memory_mb,
|
||||||
numCPUs=cpu,
|
numCPUs=cpu,
|
||||||
guestId=guest_id,
|
guestId=guest_id,
|
||||||
|
files=vm_file_info,
|
||||||
)
|
)
|
||||||
|
|
||||||
device_specs = []
|
device_specs = []
|
||||||
@ -157,9 +161,10 @@ class VMLifecycleMixin(MCPMixin):
|
|||||||
disk_spec.device.backing = vim.vm.device.VirtualDisk.FlatVer2BackingInfo()
|
disk_spec.device.backing = vim.vm.device.VirtualDisk.FlatVer2BackingInfo()
|
||||||
disk_spec.device.backing.diskMode = "persistent"
|
disk_spec.device.backing.diskMode = "persistent"
|
||||||
disk_spec.device.backing.thinProvisioned = True
|
disk_spec.device.backing.thinProvisioned = True
|
||||||
disk_spec.device.backing.datastore = datastore_obj
|
disk_spec.device.backing.fileName = f"[{datastore_obj.name}]"
|
||||||
disk_spec.device.controllerKey = controller_spec.device.key
|
disk_spec.device.controllerKey = controller_spec.device.key
|
||||||
disk_spec.device.unitNumber = 0
|
disk_spec.device.unitNumber = 0
|
||||||
|
disk_spec.device.key = -1 # Negative key for new device
|
||||||
device_specs.append(disk_spec)
|
device_specs.append(disk_spec)
|
||||||
|
|
||||||
# Add network adapter if network is available
|
# Add network adapter if network is available
|
||||||
|
|||||||
458
test_client.py
458
test_client.py
@ -1,5 +1,5 @@
|
|||||||
#!/usr/bin/env python3
|
#!/usr/bin/env python3
|
||||||
"""Simple MCP client to test the ESXi MCP server."""
|
"""Comprehensive MCP client to test all read-only ESXi MCP server tools."""
|
||||||
|
|
||||||
import asyncio
|
import asyncio
|
||||||
import json
|
import json
|
||||||
@ -24,9 +24,74 @@ def load_env_file(path: str = ".env") -> dict[str, str]:
|
|||||||
return env
|
return env
|
||||||
|
|
||||||
|
|
||||||
|
def print_result(data, indent=2, max_items=5):
|
||||||
|
"""Pretty print result data with truncation."""
|
||||||
|
if isinstance(data, list):
|
||||||
|
# Check for empty-result message
|
||||||
|
if data and isinstance(data[0], dict) and "message" in data[0] and "count" in data[0]:
|
||||||
|
print(f" {data[0]['message']}")
|
||||||
|
return
|
||||||
|
print(f" Found {len(data)} items:")
|
||||||
|
for item in data[:max_items]:
|
||||||
|
if isinstance(item, dict):
|
||||||
|
summary = ", ".join(f"{k}={v}" for k, v in list(item.items())[:4])
|
||||||
|
print(f" - {summary[:100]}...")
|
||||||
|
else:
|
||||||
|
print(f" - {item}")
|
||||||
|
if len(data) > max_items:
|
||||||
|
print(f" ... and {len(data) - max_items} more")
|
||||||
|
elif isinstance(data, dict):
|
||||||
|
for k, v in list(data.items())[:8]:
|
||||||
|
val_str = str(v)[:60] + "..." if len(str(v)) > 60 else str(v)
|
||||||
|
print(f" {k}: {val_str}")
|
||||||
|
else:
|
||||||
|
print(f" {data}")
|
||||||
|
|
||||||
|
|
||||||
|
async def test_tool(session, name: str, args: dict = None, description: str = ""):
|
||||||
|
"""Test a single tool and print results."""
|
||||||
|
args = args or {}
|
||||||
|
print(f"\n{'─' * 60}")
|
||||||
|
print(f"Testing: {name} {description}")
|
||||||
|
print(f"{'─' * 60}")
|
||||||
|
try:
|
||||||
|
result = await session.call_tool(name, args)
|
||||||
|
if result.content:
|
||||||
|
data = json.loads(result.content[0].text)
|
||||||
|
print_result(data)
|
||||||
|
return data
|
||||||
|
else:
|
||||||
|
print(" No content returned")
|
||||||
|
return None
|
||||||
|
except Exception as e:
|
||||||
|
print(f" ERROR: {e}")
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
async def test_resource(session, uri: str):
|
||||||
|
"""Test reading an MCP resource."""
|
||||||
|
print(f"\n{'─' * 60}")
|
||||||
|
print(f"Resource: {uri}")
|
||||||
|
print(f"{'─' * 60}")
|
||||||
|
try:
|
||||||
|
result = await session.read_resource(uri)
|
||||||
|
if result.contents:
|
||||||
|
data = json.loads(result.contents[0].text)
|
||||||
|
print_result(data)
|
||||||
|
return data
|
||||||
|
else:
|
||||||
|
print(" No content returned")
|
||||||
|
return None
|
||||||
|
except Exception as e:
|
||||||
|
print(f" ERROR: {e}")
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
async def main():
|
async def main():
|
||||||
"""Test the ESXi MCP server."""
|
"""Test all read-only ESXi MCP server tools."""
|
||||||
print("🔌 Connecting to ESXi MCP server...")
|
print("=" * 60)
|
||||||
|
print("ESXi MCP Server - Comprehensive Read-Only Test Suite")
|
||||||
|
print("=" * 60)
|
||||||
|
|
||||||
# Load from .env file
|
# Load from .env file
|
||||||
dotenv = load_env_file()
|
dotenv = load_env_file()
|
||||||
@ -40,260 +105,185 @@ async def main():
|
|||||||
"VCENTER_USER": dotenv.get("VCENTER_USER", os.environ.get("VCENTER_USER", "")),
|
"VCENTER_USER": dotenv.get("VCENTER_USER", os.environ.get("VCENTER_USER", "")),
|
||||||
"VCENTER_PASSWORD": dotenv.get("VCENTER_PASSWORD", os.environ.get("VCENTER_PASSWORD", "")),
|
"VCENTER_PASSWORD": dotenv.get("VCENTER_PASSWORD", os.environ.get("VCENTER_PASSWORD", "")),
|
||||||
"VCENTER_INSECURE": dotenv.get("VCENTER_INSECURE", os.environ.get("VCENTER_INSECURE", "true")),
|
"VCENTER_INSECURE": dotenv.get("VCENTER_INSECURE", os.environ.get("VCENTER_INSECURE", "true")),
|
||||||
"MCP_TRANSPORT": "stdio", # Force stdio transport
|
"MCP_TRANSPORT": "stdio",
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
async with stdio_client(server_params) as (read, write):
|
async with stdio_client(server_params) as (read, write):
|
||||||
async with ClientSession(read, write) as session:
|
async with ClientSession(read, write) as session:
|
||||||
# Initialize
|
|
||||||
await session.initialize()
|
await session.initialize()
|
||||||
print("✓ Connected!\n")
|
print("\n✓ Connected to ESXi MCP Server\n")
|
||||||
|
|
||||||
# List available tools
|
# ─────────────────────────────────────────────────────────────
|
||||||
print("=== Available Tools ===")
|
# List available tools and resources
|
||||||
|
# ─────────────────────────────────────────────────────────────
|
||||||
tools_result = await session.list_tools()
|
tools_result = await session.list_tools()
|
||||||
print(f"Total: {len(tools_result.tools)} tools")
|
|
||||||
|
|
||||||
# Group by category (based on tool name patterns)
|
|
||||||
categories = {
|
|
||||||
"VM Lifecycle": [],
|
|
||||||
"Power Ops": [],
|
|
||||||
"Snapshots": [],
|
|
||||||
"Guest Ops": [],
|
|
||||||
"Monitoring": [],
|
|
||||||
"Datastore": [],
|
|
||||||
"Disk Mgmt": [],
|
|
||||||
"NIC Mgmt": [],
|
|
||||||
"Host Mgmt": [],
|
|
||||||
"OVF/OVA": [],
|
|
||||||
"Other": [],
|
|
||||||
}
|
|
||||||
|
|
||||||
for tool in tools_result.tools:
|
|
||||||
name = tool.name
|
|
||||||
if name in ["create_vm", "clone_vm", "delete_vm", "reconfigure_vm", "get_vm_info"]:
|
|
||||||
categories["VM Lifecycle"].append(name)
|
|
||||||
elif "power" in name or name in ["reset_vm", "suspend_vm"]:
|
|
||||||
categories["Power Ops"].append(name)
|
|
||||||
elif "snapshot" in name:
|
|
||||||
categories["Snapshots"].append(name)
|
|
||||||
elif "guest" in name or "execute" in name:
|
|
||||||
categories["Guest Ops"].append(name)
|
|
||||||
elif "metrics" in name or "events" in name or "alarms" in name:
|
|
||||||
categories["Monitoring"].append(name)
|
|
||||||
elif "datastore" in name or "browse" in name or "upload" in name or "download" in name:
|
|
||||||
categories["Datastore"].append(name)
|
|
||||||
elif "disk" in name or "iso" in name:
|
|
||||||
categories["Disk Mgmt"].append(name)
|
|
||||||
elif "nic" in name or "mac" in name:
|
|
||||||
categories["NIC Mgmt"].append(name)
|
|
||||||
elif "host" in name or "maintenance" in name or "service" in name or "ntp" in name:
|
|
||||||
categories["Host Mgmt"].append(name)
|
|
||||||
elif "ovf" in name:
|
|
||||||
categories["OVF/OVA"].append(name)
|
|
||||||
else:
|
|
||||||
categories["Other"].append(name)
|
|
||||||
|
|
||||||
for cat, tools in categories.items():
|
|
||||||
if tools:
|
|
||||||
print(f" {cat}: {len(tools)} tools")
|
|
||||||
print()
|
|
||||||
|
|
||||||
# List available resources
|
|
||||||
print("=== Available Resources ===")
|
|
||||||
resources_result = await session.list_resources()
|
resources_result = await session.list_resources()
|
||||||
for r in resources_result.resources:
|
print(f"Available: {len(tools_result.tools)} tools, {len(resources_result.resources)} resources")
|
||||||
print(f" • {r.uri} ({r.name})")
|
|
||||||
print()
|
|
||||||
|
|
||||||
# Test 1: Browse datastore
|
# ─────────────────────────────────────────────────────────────
|
||||||
print("=== Test 1: Browse Datastore ===")
|
# Test MCP Resources
|
||||||
result = await session.call_tool("browse_datastore", {
|
# ─────────────────────────────────────────────────────────────
|
||||||
"datastore": "c1_ds-02",
|
print("\n" + "=" * 60)
|
||||||
"path": "iso"
|
print("SECTION 1: MCP Resources")
|
||||||
})
|
print("=" * 60)
|
||||||
files = json.loads(result.content[0].text)
|
|
||||||
print(f"Found {len(files)} files in iso/")
|
|
||||||
for f in files[:5]:
|
|
||||||
print(f" {f['name']}: {f['size_human']}")
|
|
||||||
print()
|
|
||||||
|
|
||||||
# Test 2: Read VMs resource
|
vms = await test_resource(session, "esxi://vms")
|
||||||
print("=== Test 2: Read Resource (esxi://vms) ===")
|
await test_resource(session, "esxi://hosts")
|
||||||
try:
|
await test_resource(session, "esxi://datastores")
|
||||||
result = await session.read_resource("esxi://vms")
|
await test_resource(session, "esxi://networks")
|
||||||
vms = json.loads(result.contents[0].text)
|
await test_resource(session, "esxi://clusters")
|
||||||
print(f"Found {len(vms)} VMs:")
|
|
||||||
for vm in vms[:5]:
|
|
||||||
print(f" • {vm['name']}: {vm['power_state']}")
|
|
||||||
except Exception as e:
|
|
||||||
print(f"Resource read error: {e}")
|
|
||||||
print()
|
|
||||||
|
|
||||||
# Test 3: Get Host Info (NEW!)
|
# Get a VM name for subsequent tests
|
||||||
print("=== Test 3: Get Host Info (NEW!) ===")
|
vm_name = vms[0]["name"] if vms else None
|
||||||
try:
|
print(f"\n>>> Using VM '{vm_name}' for subsequent tests")
|
||||||
result = await session.call_tool("get_host_info", {})
|
|
||||||
host_info = json.loads(result.content[0].text)
|
|
||||||
print(f" Host: {host_info.get('name', 'N/A')}")
|
|
||||||
prod = host_info.get('product', {})
|
|
||||||
print(f" ESXi: {prod.get('version', 'N/A')} (build {prod.get('build', 'N/A')})")
|
|
||||||
hw = host_info.get('hardware', {})
|
|
||||||
print(f" CPU: {hw.get('cpu_cores', 'N/A')} cores @ {hw.get('cpu_mhz', 'N/A')} MHz")
|
|
||||||
print(f" RAM: {hw.get('memory_gb', 'N/A')} GB")
|
|
||||||
status = host_info.get('status', {})
|
|
||||||
print(f" Maintenance: {status.get('maintenance_mode', 'N/A')}")
|
|
||||||
except Exception as e:
|
|
||||||
print(f"Error: {e}")
|
|
||||||
print()
|
|
||||||
|
|
||||||
# Test 4: List Services (NEW!)
|
# ─────────────────────────────────────────────────────────────
|
||||||
print("=== Test 4: List Services (NEW!) ===")
|
# VM Lifecycle (read-only)
|
||||||
try:
|
# ─────────────────────────────────────────────────────────────
|
||||||
result = await session.call_tool("list_services", {})
|
print("\n" + "=" * 60)
|
||||||
services = json.loads(result.content[0].text)
|
print("SECTION 2: VM Lifecycle Tools")
|
||||||
running = [s for s in services if s.get('running')]
|
print("=" * 60)
|
||||||
print(f"Found {len(services)} services ({len(running)} running):")
|
|
||||||
for s in running[:8]:
|
|
||||||
print(f" ✓ {s['key']}: {s['label']}")
|
|
||||||
except Exception as e:
|
|
||||||
print(f"Error: {e}")
|
|
||||||
print()
|
|
||||||
|
|
||||||
# Test 5: Get NTP Config (NEW!)
|
await test_tool(session, "list_vms")
|
||||||
print("=== Test 5: Get NTP Config (NEW!) ===")
|
if vm_name:
|
||||||
try:
|
await test_tool(session, "get_vm_info", {"name": vm_name})
|
||||||
result = await session.call_tool("get_ntp_config", {})
|
|
||||||
ntp = json.loads(result.content[0].text)
|
|
||||||
print(f" NTP Servers: {ntp.get('ntp_servers', [])}")
|
|
||||||
print(f" Service Running: {ntp.get('service_running', 'N/A')}")
|
|
||||||
print(f" Timezone: {ntp.get('timezone', 'N/A')}")
|
|
||||||
except Exception as e:
|
|
||||||
print(f"Error: {e}")
|
|
||||||
print()
|
|
||||||
|
|
||||||
# Test 6: List VM Disks (if there are VMs)
|
# ─────────────────────────────────────────────────────────────
|
||||||
print("=== Test 6: List VM Disks (NEW!) ===")
|
# Monitoring Tools
|
||||||
try:
|
# ─────────────────────────────────────────────────────────────
|
||||||
# First get a VM name
|
print("\n" + "=" * 60)
|
||||||
result = await session.read_resource("esxi://vms")
|
print("SECTION 3: Monitoring Tools")
|
||||||
vms = json.loads(result.contents[0].text)
|
print("=" * 60)
|
||||||
if vms:
|
|
||||||
vm_name = vms[0]['name']
|
|
||||||
print(f" Checking disks on: {vm_name}")
|
|
||||||
result = await session.call_tool("list_disks", {"vm_name": vm_name})
|
|
||||||
disks = json.loads(result.content[0].text)
|
|
||||||
for d in disks:
|
|
||||||
print(f" • {d['label']}: {d.get('size_gb', 'N/A')} GB")
|
|
||||||
else:
|
|
||||||
print(" No VMs found to check")
|
|
||||||
except Exception as e:
|
|
||||||
print(f"Error: {e}")
|
|
||||||
print()
|
|
||||||
|
|
||||||
# Test 7: List VM NICs (NEW!)
|
await test_tool(session, "list_hosts")
|
||||||
print("=== Test 7: List VM NICs (NEW!) ===")
|
await test_tool(session, "get_host_stats")
|
||||||
try:
|
if vm_name:
|
||||||
if vms:
|
await test_tool(session, "get_vm_stats", {"name": vm_name})
|
||||||
vm_name = vms[0]['name']
|
await test_tool(session, "get_alarms")
|
||||||
print(f" Checking NICs on: {vm_name}")
|
await test_tool(session, "get_recent_events", {"count": 5})
|
||||||
result = await session.call_tool("list_nics", {"vm_name": vm_name})
|
await test_tool(session, "get_recent_tasks", {"count": 5})
|
||||||
nics = json.loads(result.content[0].text)
|
|
||||||
for n in nics:
|
|
||||||
print(f" • {n['label']}: {n.get('type', 'N/A')} - {n.get('network', 'N/A')}")
|
|
||||||
print(f" MAC: {n.get('mac_address', 'N/A')}, Connected: {n.get('connected', 'N/A')}")
|
|
||||||
except Exception as e:
|
|
||||||
print(f"Error: {e}")
|
|
||||||
print()
|
|
||||||
|
|
||||||
# ─────────────────────────────────────────────────────────────────────────
|
# ─────────────────────────────────────────────────────────────
|
||||||
# vCenter-Specific Tests (NEW!)
|
# Host Management Tools
|
||||||
# ─────────────────────────────────────────────────────────────────────────
|
# ─────────────────────────────────────────────────────────────
|
||||||
|
print("\n" + "=" * 60)
|
||||||
|
print("SECTION 4: Host Management Tools")
|
||||||
|
print("=" * 60)
|
||||||
|
|
||||||
# Test 8: List Folders
|
await test_tool(session, "get_host_info")
|
||||||
print("=== Test 8: List Folders (vCenter) ===")
|
await test_tool(session, "get_host_hardware")
|
||||||
try:
|
await test_tool(session, "get_host_networking")
|
||||||
result = await session.call_tool("list_folders", {})
|
await test_tool(session, "list_services")
|
||||||
folders = json.loads(result.content[0].text)
|
await test_tool(session, "get_ntp_config")
|
||||||
print(f"Found {len(folders)} folders:")
|
|
||||||
for f in folders[:10]:
|
|
||||||
print(f" 📁 {f['path']} ({f.get('children', 0)} children)")
|
|
||||||
except Exception as e:
|
|
||||||
print(f"Error: {e}")
|
|
||||||
print()
|
|
||||||
|
|
||||||
# Test 9: List Clusters
|
# ─────────────────────────────────────────────────────────────
|
||||||
print("=== Test 9: List Clusters (vCenter) ===")
|
# Datastore/Resources Tools
|
||||||
try:
|
# ─────────────────────────────────────────────────────────────
|
||||||
result = await session.call_tool("list_clusters", {})
|
print("\n" + "=" * 60)
|
||||||
clusters = json.loads(result.content[0].text)
|
print("SECTION 5: Datastore & Resource Tools")
|
||||||
# Check if it's an empty-result message
|
print("=" * 60)
|
||||||
if clusters and 'message' in clusters[0]:
|
|
||||||
print(f" {clusters[0]['message']}")
|
|
||||||
elif clusters:
|
|
||||||
print(f"Found {len(clusters)} clusters:")
|
|
||||||
for c in clusters:
|
|
||||||
print(f" 🖥️ {c['name']}: {c.get('host_count', 0)} hosts")
|
|
||||||
print(f" DRS: {c.get('drs', {}).get('enabled', 'N/A')}, HA: {c.get('ha', {}).get('enabled', 'N/A')}")
|
|
||||||
else:
|
|
||||||
print(" No clusters found (standalone host mode)")
|
|
||||||
except Exception as e:
|
|
||||||
print(f"Error: {e}")
|
|
||||||
print()
|
|
||||||
|
|
||||||
# Test 10: List Recent Tasks
|
# Get datastore name from resources
|
||||||
print("=== Test 10: Recent Tasks (vCenter) ===")
|
ds_result = await session.read_resource("esxi://datastores")
|
||||||
try:
|
datastores = json.loads(ds_result.contents[0].text) if ds_result.contents else []
|
||||||
result = await session.call_tool("list_recent_tasks", {"max_count": 5})
|
ds_name = datastores[0]["name"] if datastores else None
|
||||||
tasks = json.loads(result.content[0].text)
|
print(f"\n>>> Using datastore '{ds_name}' for tests")
|
||||||
# Check if it's an empty-result message
|
|
||||||
if tasks and 'message' in tasks[0]:
|
|
||||||
print(f" {tasks[0]['message']}")
|
|
||||||
else:
|
|
||||||
print(f"Found {len(tasks)} recent tasks:")
|
|
||||||
for t in tasks[:5]:
|
|
||||||
entity = t.get('entity', 'N/A')
|
|
||||||
print(f" 📋 {t.get('name', 'N/A')} - {t.get('state', 'N/A')}")
|
|
||||||
print(f" Entity: {entity} | Started: {t.get('start_time', 'N/A')}")
|
|
||||||
except Exception as e:
|
|
||||||
print(f"Error: {e}")
|
|
||||||
print()
|
|
||||||
|
|
||||||
# Test 11: Get Alarms
|
if ds_name:
|
||||||
print("=== Test 11: Get Alarms (vCenter) ===")
|
await test_tool(session, "get_datastore_info", {"name": ds_name})
|
||||||
try:
|
await test_tool(session, "browse_datastore", {"datastore": ds_name, "path": ""})
|
||||||
result = await session.call_tool("get_alarms", {})
|
|
||||||
alarms = json.loads(result.content[0].text)
|
|
||||||
if alarms:
|
|
||||||
print(f"Found {len(alarms)} active alarms:")
|
|
||||||
for a in alarms[:5]:
|
|
||||||
print(f" ⚠️ {a.get('alarm', 'N/A')} on {a.get('entity', 'N/A')}")
|
|
||||||
print(f" Status: {a.get('status', 'N/A')}, Acknowledged: {a.get('acknowledged', 'N/A')}")
|
|
||||||
else:
|
|
||||||
print(" No active alarms (all clear!)")
|
|
||||||
except Exception as e:
|
|
||||||
print(f"Error: {e}")
|
|
||||||
print()
|
|
||||||
|
|
||||||
# Test 12: List Recent Events
|
await test_tool(session, "get_vcenter_info")
|
||||||
print("=== Test 12: Recent Events (vCenter) ===")
|
await test_tool(session, "get_resource_pool_info")
|
||||||
try:
|
|
||||||
result = await session.call_tool("list_recent_events", {"max_count": 5, "hours_back": 24})
|
|
||||||
events = json.loads(result.content[0].text)
|
|
||||||
print(f"Found {len(events)} events in last 24h:")
|
|
||||||
for e in events[:5]:
|
|
||||||
vm_info = f" (VM: {e['vm']})" if e.get('vm') else ""
|
|
||||||
print(f" 📌 {e.get('type', 'N/A')}{vm_info}")
|
|
||||||
msg = e.get('message', 'N/A')
|
|
||||||
if len(msg) > 80:
|
|
||||||
msg = msg[:77] + "..."
|
|
||||||
print(f" {msg}")
|
|
||||||
except Exception as e:
|
|
||||||
print(f"Error: {e}")
|
|
||||||
print()
|
|
||||||
|
|
||||||
print("✅ All tests completed!")
|
# Get network name
|
||||||
|
net_result = await session.read_resource("esxi://networks")
|
||||||
|
networks = json.loads(net_result.contents[0].text) if net_result.contents else []
|
||||||
|
net_name = networks[0]["name"] if networks else None
|
||||||
|
if net_name:
|
||||||
|
await test_tool(session, "get_network_info", {"name": net_name})
|
||||||
|
|
||||||
|
await test_tool(session, "list_templates")
|
||||||
|
|
||||||
|
# ─────────────────────────────────────────────────────────────
|
||||||
|
# Disk Management Tools
|
||||||
|
# ─────────────────────────────────────────────────────────────
|
||||||
|
print("\n" + "=" * 60)
|
||||||
|
print("SECTION 6: Disk Management Tools")
|
||||||
|
print("=" * 60)
|
||||||
|
|
||||||
|
if vm_name:
|
||||||
|
await test_tool(session, "list_disks", {"vm_name": vm_name})
|
||||||
|
|
||||||
|
# ─────────────────────────────────────────────────────────────
|
||||||
|
# NIC Management Tools
|
||||||
|
# ─────────────────────────────────────────────────────────────
|
||||||
|
print("\n" + "=" * 60)
|
||||||
|
print("SECTION 7: NIC Management Tools")
|
||||||
|
print("=" * 60)
|
||||||
|
|
||||||
|
if vm_name:
|
||||||
|
await test_tool(session, "list_nics", {"vm_name": vm_name})
|
||||||
|
|
||||||
|
# ─────────────────────────────────────────────────────────────
|
||||||
|
# Snapshot Tools
|
||||||
|
# ─────────────────────────────────────────────────────────────
|
||||||
|
print("\n" + "=" * 60)
|
||||||
|
print("SECTION 8: Snapshot Tools")
|
||||||
|
print("=" * 60)
|
||||||
|
|
||||||
|
if vm_name:
|
||||||
|
await test_tool(session, "list_snapshots", {"name": vm_name})
|
||||||
|
|
||||||
|
# ─────────────────────────────────────────────────────────────
|
||||||
|
# OVF Tools
|
||||||
|
# ─────────────────────────────────────────────────────────────
|
||||||
|
print("\n" + "=" * 60)
|
||||||
|
print("SECTION 9: OVF Tools")
|
||||||
|
print("=" * 60)
|
||||||
|
|
||||||
|
await test_tool(session, "list_ovf_networks")
|
||||||
|
|
||||||
|
# ─────────────────────────────────────────────────────────────
|
||||||
|
# vCenter-Specific Tools
|
||||||
|
# ─────────────────────────────────────────────────────────────
|
||||||
|
print("\n" + "=" * 60)
|
||||||
|
print("SECTION 10: vCenter-Specific Tools")
|
||||||
|
print("=" * 60)
|
||||||
|
|
||||||
|
await test_tool(session, "list_folders")
|
||||||
|
await test_tool(session, "list_clusters")
|
||||||
|
await test_tool(session, "list_recent_tasks", {"max_count": 5})
|
||||||
|
await test_tool(session, "list_recent_events", {"max_count": 5, "hours_back": 24})
|
||||||
|
|
||||||
|
# ─────────────────────────────────────────────────────────────
|
||||||
|
# Guest Operations (require VMware Tools + credentials)
|
||||||
|
# ─────────────────────────────────────────────────────────────
|
||||||
|
print("\n" + "=" * 60)
|
||||||
|
print("SECTION 11: Guest Operations (may fail without VMware Tools)")
|
||||||
|
print("=" * 60)
|
||||||
|
|
||||||
|
# These typically need a running VM with VMware Tools
|
||||||
|
# and guest credentials - expect failures on most VMs
|
||||||
|
if vm_name:
|
||||||
|
await test_tool(
|
||||||
|
session, "list_guest_processes",
|
||||||
|
{"name": vm_name, "username": "root", "password": "test"},
|
||||||
|
"(expected to fail without valid credentials)"
|
||||||
|
)
|
||||||
|
|
||||||
|
# ─────────────────────────────────────────────────────────────
|
||||||
|
# Summary
|
||||||
|
# ─────────────────────────────────────────────────────────────
|
||||||
|
print("\n" + "=" * 60)
|
||||||
|
print("TEST SUMMARY")
|
||||||
|
print("=" * 60)
|
||||||
|
print(f"✅ Read-only test suite completed")
|
||||||
|
print(f" Tools available: {len(tools_result.tools)}")
|
||||||
|
print(f" Resources available: {len(resources_result.resources)}")
|
||||||
|
print(f"\nNote: Guest operations require VMware Tools + valid credentials")
|
||||||
|
print("Note: Some vCenter tools return empty on standalone hosts")
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
|
|||||||
503
test_destructive.py
Normal file
503
test_destructive.py
Normal file
@ -0,0 +1,503 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
"""Destructive test suite for ESXi MCP server - creates/modifies/deletes resources.
|
||||||
|
|
||||||
|
WARNING: This test creates real VMs and modifies infrastructure!
|
||||||
|
Only run in a test environment.
|
||||||
|
|
||||||
|
Usage:
|
||||||
|
python test_destructive.py [--skip-cleanup]
|
||||||
|
|
||||||
|
--skip-cleanup: Leave test VM for inspection (default: cleanup)
|
||||||
|
"""
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
import json
|
||||||
|
import os
|
||||||
|
import sys
|
||||||
|
from datetime import datetime
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
from mcp import ClientSession, StdioServerParameters
|
||||||
|
from mcp.client.stdio import stdio_client
|
||||||
|
|
||||||
|
# Test configuration
|
||||||
|
TEST_VM_NAME = f"mcp-test-{datetime.now().strftime('%Y%m%d-%H%M%S')}"
|
||||||
|
TEST_FOLDER_NAME = f"mcp-test-folder-{datetime.now().strftime('%H%M%S')}"
|
||||||
|
SKIP_CLEANUP = "--skip-cleanup" in sys.argv
|
||||||
|
|
||||||
|
|
||||||
|
def load_env_file(path: str = ".env") -> dict[str, str]:
|
||||||
|
"""Load environment variables from a .env file."""
|
||||||
|
env = {}
|
||||||
|
env_path = Path(path)
|
||||||
|
if env_path.exists():
|
||||||
|
with open(env_path) as f:
|
||||||
|
for line in f:
|
||||||
|
line = line.strip()
|
||||||
|
if line and not line.startswith("#") and "=" in line:
|
||||||
|
key, _, value = line.partition("=")
|
||||||
|
env[key.strip()] = value.strip()
|
||||||
|
return env
|
||||||
|
|
||||||
|
|
||||||
|
class TestResult:
|
||||||
|
"""Track test results."""
|
||||||
|
def __init__(self):
|
||||||
|
self.passed = 0
|
||||||
|
self.failed = 0
|
||||||
|
self.skipped = 0
|
||||||
|
self.errors = []
|
||||||
|
|
||||||
|
def record(self, name: str, success: bool, error: str = None):
|
||||||
|
if success:
|
||||||
|
self.passed += 1
|
||||||
|
print(f" ✅ {name}")
|
||||||
|
else:
|
||||||
|
self.failed += 1
|
||||||
|
self.errors.append((name, error))
|
||||||
|
print(f" ❌ {name}: {error}")
|
||||||
|
|
||||||
|
def skip(self, name: str, reason: str):
|
||||||
|
self.skipped += 1
|
||||||
|
print(f" ⏭️ {name}: {reason}")
|
||||||
|
|
||||||
|
def summary(self):
|
||||||
|
total = self.passed + self.failed + self.skipped
|
||||||
|
print(f"\n{'=' * 60}")
|
||||||
|
print("DESTRUCTIVE TEST SUMMARY")
|
||||||
|
print(f"{'=' * 60}")
|
||||||
|
print(f" Passed: {self.passed}/{total}")
|
||||||
|
print(f" Failed: {self.failed}/{total}")
|
||||||
|
print(f" Skipped: {self.skipped}/{total}")
|
||||||
|
if self.errors:
|
||||||
|
print(f"\nErrors:")
|
||||||
|
for name, error in self.errors:
|
||||||
|
print(f" - {name}: {error}")
|
||||||
|
return self.failed == 0
|
||||||
|
|
||||||
|
|
||||||
|
async def call_tool(session, name: str, args: dict = None) -> tuple[bool, any]:
|
||||||
|
"""Call a tool and return (success, result)."""
|
||||||
|
args = args or {}
|
||||||
|
try:
|
||||||
|
result = await session.call_tool(name, args)
|
||||||
|
if result.content:
|
||||||
|
text = result.content[0].text
|
||||||
|
# Try to parse as JSON, fall back to plain text
|
||||||
|
try:
|
||||||
|
data = json.loads(text)
|
||||||
|
except json.JSONDecodeError:
|
||||||
|
data = text
|
||||||
|
return True, data
|
||||||
|
return True, None
|
||||||
|
except Exception as e:
|
||||||
|
return False, str(e)
|
||||||
|
|
||||||
|
|
||||||
|
async def main():
|
||||||
|
"""Run destructive tests."""
|
||||||
|
print("=" * 60)
|
||||||
|
print("ESXi MCP Server - DESTRUCTIVE Test Suite")
|
||||||
|
print("=" * 60)
|
||||||
|
print(f"\n⚠️ WARNING: This test will CREATE and MODIFY resources!")
|
||||||
|
print(f" Test VM: {TEST_VM_NAME}")
|
||||||
|
print(f" Cleanup: {'DISABLED' if SKIP_CLEANUP else 'ENABLED'}")
|
||||||
|
print()
|
||||||
|
|
||||||
|
results = TestResult()
|
||||||
|
dotenv = load_env_file()
|
||||||
|
|
||||||
|
# Get datastore and network from env or use defaults
|
||||||
|
default_datastore = dotenv.get("VCENTER_DATASTORE", "datastore1")
|
||||||
|
default_network = dotenv.get("VCENTER_NETWORK", "VM Network")
|
||||||
|
|
||||||
|
server_params = StdioServerParameters(
|
||||||
|
command="uv",
|
||||||
|
args=["run", "esxi-mcp-server"],
|
||||||
|
env={
|
||||||
|
**os.environ,
|
||||||
|
"VCENTER_HOST": dotenv.get("VCENTER_HOST", ""),
|
||||||
|
"VCENTER_USER": dotenv.get("VCENTER_USER", ""),
|
||||||
|
"VCENTER_PASSWORD": dotenv.get("VCENTER_PASSWORD", ""),
|
||||||
|
"VCENTER_INSECURE": dotenv.get("VCENTER_INSECURE", "true"),
|
||||||
|
"MCP_TRANSPORT": "stdio",
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
async with stdio_client(server_params) as (read, write):
|
||||||
|
async with ClientSession(read, write) as session:
|
||||||
|
await session.initialize()
|
||||||
|
print("✓ Connected to ESXi MCP Server\n")
|
||||||
|
|
||||||
|
# Get available datastores and networks for test
|
||||||
|
ds_result = await session.read_resource("esxi://datastores")
|
||||||
|
datastores = json.loads(ds_result.contents[0].text) if ds_result.contents else []
|
||||||
|
datastore = datastores[0]["name"] if datastores else default_datastore
|
||||||
|
|
||||||
|
net_result = await session.read_resource("esxi://networks")
|
||||||
|
networks = json.loads(net_result.contents[0].text) if net_result.contents else []
|
||||||
|
network = networks[0]["name"] if networks else default_network
|
||||||
|
|
||||||
|
print(f"Using datastore: {datastore}")
|
||||||
|
print(f"Using network: {network}")
|
||||||
|
|
||||||
|
# ─────────────────────────────────────────────────────────────
|
||||||
|
# SECTION 1: VM Lifecycle
|
||||||
|
# ─────────────────────────────────────────────────────────────
|
||||||
|
print(f"\n{'=' * 60}")
|
||||||
|
print("SECTION 1: VM Lifecycle")
|
||||||
|
print(f"{'=' * 60}")
|
||||||
|
|
||||||
|
# Create VM
|
||||||
|
print(f"\n>>> Creating test VM: {TEST_VM_NAME}")
|
||||||
|
success, data = await call_tool(session, "create_vm", {
|
||||||
|
"name": TEST_VM_NAME,
|
||||||
|
"cpu": 1,
|
||||||
|
"memory_mb": 512,
|
||||||
|
"disk_gb": 1,
|
||||||
|
"guest_id": "otherGuest64",
|
||||||
|
"datastore": datastore,
|
||||||
|
"network": network,
|
||||||
|
})
|
||||||
|
results.record("create_vm", success, data if not success else None)
|
||||||
|
|
||||||
|
if not success:
|
||||||
|
print("\n❌ Cannot continue without test VM. Aborting.")
|
||||||
|
results.summary()
|
||||||
|
return
|
||||||
|
|
||||||
|
# Get VM info
|
||||||
|
success, data = await call_tool(session, "get_vm_info", {"name": TEST_VM_NAME})
|
||||||
|
results.record("get_vm_info (new VM)", success, data if not success else None)
|
||||||
|
|
||||||
|
# Rename VM (and rename back)
|
||||||
|
new_name = f"{TEST_VM_NAME}-renamed"
|
||||||
|
success, data = await call_tool(session, "rename_vm", {
|
||||||
|
"name": TEST_VM_NAME,
|
||||||
|
"new_name": new_name,
|
||||||
|
})
|
||||||
|
results.record("rename_vm", success, data if not success else None)
|
||||||
|
|
||||||
|
if success:
|
||||||
|
# Rename back
|
||||||
|
await call_tool(session, "rename_vm", {"name": new_name, "new_name": TEST_VM_NAME})
|
||||||
|
|
||||||
|
# Reconfigure VM
|
||||||
|
success, data = await call_tool(session, "reconfigure_vm", {
|
||||||
|
"name": TEST_VM_NAME,
|
||||||
|
"memory_mb": 1024,
|
||||||
|
})
|
||||||
|
results.record("reconfigure_vm (memory)", success, data if not success else None)
|
||||||
|
|
||||||
|
# ─────────────────────────────────────────────────────────────
|
||||||
|
# SECTION 2: Power Operations
|
||||||
|
# ─────────────────────────────────────────────────────────────
|
||||||
|
print(f"\n{'=' * 60}")
|
||||||
|
print("SECTION 2: Power Operations")
|
||||||
|
print(f"{'=' * 60}")
|
||||||
|
|
||||||
|
# Power on
|
||||||
|
success, data = await call_tool(session, "power_on", {"name": TEST_VM_NAME})
|
||||||
|
results.record("power_on", success, data if not success else None)
|
||||||
|
|
||||||
|
if success:
|
||||||
|
# Wait a moment for power state to stabilize
|
||||||
|
await asyncio.sleep(3)
|
||||||
|
|
||||||
|
# Suspend
|
||||||
|
success, data = await call_tool(session, "suspend_vm", {"name": TEST_VM_NAME})
|
||||||
|
results.record("suspend_vm", success, data if not success else None)
|
||||||
|
|
||||||
|
if success:
|
||||||
|
await asyncio.sleep(2)
|
||||||
|
# Power on again to test power off
|
||||||
|
await call_tool(session, "power_on", {"name": TEST_VM_NAME})
|
||||||
|
await asyncio.sleep(2)
|
||||||
|
|
||||||
|
# Power off
|
||||||
|
success, data = await call_tool(session, "power_off", {"name": TEST_VM_NAME})
|
||||||
|
results.record("power_off", success, data if not success else None)
|
||||||
|
else:
|
||||||
|
results.skip("suspend_vm", "power_on failed")
|
||||||
|
results.skip("power_off", "power_on failed")
|
||||||
|
|
||||||
|
# Ensure VM is off for disk operations
|
||||||
|
await call_tool(session, "power_off", {"name": TEST_VM_NAME})
|
||||||
|
await asyncio.sleep(2)
|
||||||
|
|
||||||
|
# ─────────────────────────────────────────────────────────────
|
||||||
|
# SECTION 3: Disk Management
|
||||||
|
# ─────────────────────────────────────────────────────────────
|
||||||
|
print(f"\n{'=' * 60}")
|
||||||
|
print("SECTION 3: Disk Management")
|
||||||
|
print(f"{'=' * 60}")
|
||||||
|
|
||||||
|
# Add disk
|
||||||
|
success, data = await call_tool(session, "add_disk", {
|
||||||
|
"vm_name": TEST_VM_NAME,
|
||||||
|
"size_gb": 1,
|
||||||
|
"thin_provisioned": True,
|
||||||
|
})
|
||||||
|
results.record("add_disk", success, data if not success else None)
|
||||||
|
|
||||||
|
# List disks
|
||||||
|
success, data = await call_tool(session, "list_disks", {"vm_name": TEST_VM_NAME})
|
||||||
|
results.record("list_disks (after add)", success, data if not success else None)
|
||||||
|
disk_count = len(data) if success and isinstance(data, list) else 0
|
||||||
|
|
||||||
|
# Extend disk
|
||||||
|
if disk_count > 0:
|
||||||
|
success, data = await call_tool(session, "extend_disk", {
|
||||||
|
"vm_name": TEST_VM_NAME,
|
||||||
|
"disk_label": "Hard disk 1",
|
||||||
|
"new_size_gb": 2,
|
||||||
|
})
|
||||||
|
results.record("extend_disk", success, data if not success else None)
|
||||||
|
|
||||||
|
# Remove the added disk (Hard disk 2)
|
||||||
|
if disk_count >= 2:
|
||||||
|
success, data = await call_tool(session, "remove_disk", {
|
||||||
|
"vm_name": TEST_VM_NAME,
|
||||||
|
"disk_label": "Hard disk 2",
|
||||||
|
})
|
||||||
|
results.record("remove_disk", success, data if not success else None)
|
||||||
|
else:
|
||||||
|
results.skip("remove_disk", "Not enough disks")
|
||||||
|
|
||||||
|
# ─────────────────────────────────────────────────────────────
|
||||||
|
# SECTION 4: NIC Management
|
||||||
|
# ─────────────────────────────────────────────────────────────
|
||||||
|
print(f"\n{'=' * 60}")
|
||||||
|
print("SECTION 4: NIC Management")
|
||||||
|
print(f"{'=' * 60}")
|
||||||
|
|
||||||
|
# Add NIC
|
||||||
|
success, data = await call_tool(session, "add_nic", {
|
||||||
|
"vm_name": TEST_VM_NAME,
|
||||||
|
"network": network,
|
||||||
|
"nic_type": "vmxnet3",
|
||||||
|
})
|
||||||
|
results.record("add_nic", success, data if not success else None)
|
||||||
|
|
||||||
|
# List NICs
|
||||||
|
success, data = await call_tool(session, "list_nics", {"vm_name": TEST_VM_NAME})
|
||||||
|
results.record("list_nics (after add)", success, data if not success else None)
|
||||||
|
nic_count = len(data) if success and isinstance(data, list) else 0
|
||||||
|
|
||||||
|
# Connect/disconnect NIC
|
||||||
|
if nic_count > 0:
|
||||||
|
success, data = await call_tool(session, "connect_nic", {
|
||||||
|
"vm_name": TEST_VM_NAME,
|
||||||
|
"nic_label": "Network adapter 1",
|
||||||
|
"connected": False,
|
||||||
|
})
|
||||||
|
results.record("connect_nic (disconnect)", success, data if not success else None)
|
||||||
|
|
||||||
|
# Remove added NIC (Network adapter 2)
|
||||||
|
if nic_count >= 2:
|
||||||
|
success, data = await call_tool(session, "remove_nic", {
|
||||||
|
"vm_name": TEST_VM_NAME,
|
||||||
|
"nic_label": "Network adapter 2",
|
||||||
|
})
|
||||||
|
results.record("remove_nic", success, data if not success else None)
|
||||||
|
else:
|
||||||
|
results.skip("remove_nic", "Not enough NICs")
|
||||||
|
|
||||||
|
# ─────────────────────────────────────────────────────────────
|
||||||
|
# SECTION 5: Snapshots
|
||||||
|
# ─────────────────────────────────────────────────────────────
|
||||||
|
print(f"\n{'=' * 60}")
|
||||||
|
print("SECTION 5: Snapshots")
|
||||||
|
print(f"{'=' * 60}")
|
||||||
|
|
||||||
|
# Create snapshot
|
||||||
|
success, data = await call_tool(session, "create_snapshot", {
|
||||||
|
"name": TEST_VM_NAME,
|
||||||
|
"snapshot_name": "test-snapshot-1",
|
||||||
|
"description": "MCP test snapshot",
|
||||||
|
})
|
||||||
|
results.record("create_snapshot", success, data if not success else None)
|
||||||
|
|
||||||
|
# List snapshots
|
||||||
|
success, data = await call_tool(session, "list_snapshots", {"name": TEST_VM_NAME})
|
||||||
|
results.record("list_snapshots", success, data if not success else None)
|
||||||
|
|
||||||
|
# Rename snapshot
|
||||||
|
success, data = await call_tool(session, "rename_snapshot", {
|
||||||
|
"name": TEST_VM_NAME,
|
||||||
|
"snapshot_name": "test-snapshot-1",
|
||||||
|
"new_name": "renamed-snapshot",
|
||||||
|
"new_description": "Renamed by MCP test",
|
||||||
|
})
|
||||||
|
results.record("rename_snapshot", success, data if not success else None)
|
||||||
|
|
||||||
|
# Revert to snapshot
|
||||||
|
success, data = await call_tool(session, "revert_to_snapshot", {
|
||||||
|
"name": TEST_VM_NAME,
|
||||||
|
"snapshot_name": "renamed-snapshot",
|
||||||
|
})
|
||||||
|
results.record("revert_to_snapshot", success, data if not success else None)
|
||||||
|
|
||||||
|
# Delete snapshot
|
||||||
|
success, data = await call_tool(session, "delete_snapshot", {
|
||||||
|
"name": TEST_VM_NAME,
|
||||||
|
"snapshot_name": "renamed-snapshot",
|
||||||
|
})
|
||||||
|
results.record("delete_snapshot", success, data if not success else None)
|
||||||
|
|
||||||
|
# ─────────────────────────────────────────────────────────────
|
||||||
|
# SECTION 6: Folder Operations (vCenter)
|
||||||
|
# ─────────────────────────────────────────────────────────────
|
||||||
|
print(f"\n{'=' * 60}")
|
||||||
|
print("SECTION 6: Folder Operations (vCenter)")
|
||||||
|
print(f"{'=' * 60}")
|
||||||
|
|
||||||
|
# Create folder
|
||||||
|
success, data = await call_tool(session, "create_folder", {
|
||||||
|
"folder_name": TEST_FOLDER_NAME,
|
||||||
|
})
|
||||||
|
results.record("create_folder", success, data if not success else None)
|
||||||
|
folder_created = success
|
||||||
|
|
||||||
|
# Move VM to folder
|
||||||
|
if folder_created:
|
||||||
|
success, data = await call_tool(session, "move_vm_to_folder", {
|
||||||
|
"vm_name": TEST_VM_NAME,
|
||||||
|
"folder_path": f"vm/{TEST_FOLDER_NAME}",
|
||||||
|
})
|
||||||
|
results.record("move_vm_to_folder", success, data if not success else None)
|
||||||
|
|
||||||
|
# Move back to root for cleanup
|
||||||
|
if success:
|
||||||
|
await call_tool(session, "move_vm_to_folder", {
|
||||||
|
"vm_name": TEST_VM_NAME,
|
||||||
|
"folder_path": "vm",
|
||||||
|
})
|
||||||
|
else:
|
||||||
|
results.skip("move_vm_to_folder", "folder creation failed")
|
||||||
|
|
||||||
|
# ─────────────────────────────────────────────────────────────
|
||||||
|
# SECTION 7: Datastore Operations
|
||||||
|
# ─────────────────────────────────────────────────────────────
|
||||||
|
print(f"\n{'=' * 60}")
|
||||||
|
print("SECTION 7: Datastore Operations")
|
||||||
|
print(f"{'=' * 60}")
|
||||||
|
|
||||||
|
# Create folder in datastore
|
||||||
|
test_ds_folder = f"mcp-test-{datetime.now().strftime('%H%M%S')}"
|
||||||
|
success, data = await call_tool(session, "create_datastore_folder", {
|
||||||
|
"datastore": datastore,
|
||||||
|
"path": test_ds_folder,
|
||||||
|
})
|
||||||
|
results.record("create_datastore_folder", success, data if not success else None)
|
||||||
|
ds_folder_created = success
|
||||||
|
|
||||||
|
# Delete datastore folder
|
||||||
|
if ds_folder_created:
|
||||||
|
success, data = await call_tool(session, "delete_datastore_file", {
|
||||||
|
"datastore": datastore,
|
||||||
|
"path": test_ds_folder,
|
||||||
|
})
|
||||||
|
results.record("delete_datastore_file (folder)", success, data if not success else None)
|
||||||
|
else:
|
||||||
|
results.skip("delete_datastore_file", "folder creation failed")
|
||||||
|
|
||||||
|
# ─────────────────────────────────────────────────────────────
|
||||||
|
# SECTION 8: vCenter Advanced Operations
|
||||||
|
# ─────────────────────────────────────────────────────────────
|
||||||
|
print(f"\n{'=' * 60}")
|
||||||
|
print("SECTION 8: vCenter Advanced Operations")
|
||||||
|
print(f"{'=' * 60}")
|
||||||
|
|
||||||
|
# Storage vMotion - move VM to different datastore
|
||||||
|
# Get list of datastores to find a second one
|
||||||
|
if len(datastores) >= 2:
|
||||||
|
# Find a different datastore
|
||||||
|
other_datastore = None
|
||||||
|
for ds in datastores:
|
||||||
|
if ds["name"] != datastore:
|
||||||
|
other_datastore = ds["name"]
|
||||||
|
break
|
||||||
|
|
||||||
|
if other_datastore:
|
||||||
|
print(f"\n>>> Storage vMotion: {datastore} → {other_datastore}")
|
||||||
|
success, data = await call_tool(session, "storage_vmotion", {
|
||||||
|
"vm_name": TEST_VM_NAME,
|
||||||
|
"target_datastore": other_datastore,
|
||||||
|
})
|
||||||
|
results.record("storage_vmotion", success, data if not success else None)
|
||||||
|
|
||||||
|
# Move back to original datastore
|
||||||
|
if success:
|
||||||
|
await call_tool(session, "storage_vmotion", {
|
||||||
|
"vm_name": TEST_VM_NAME,
|
||||||
|
"target_datastore": datastore,
|
||||||
|
})
|
||||||
|
else:
|
||||||
|
results.skip("storage_vmotion", "No alternate datastore found")
|
||||||
|
else:
|
||||||
|
results.skip("storage_vmotion", "Only one datastore available")
|
||||||
|
|
||||||
|
# Convert VM to template
|
||||||
|
print(f"\n>>> Converting VM to template: {TEST_VM_NAME}")
|
||||||
|
success, data = await call_tool(session, "convert_to_template", {
|
||||||
|
"vm_name": TEST_VM_NAME,
|
||||||
|
})
|
||||||
|
results.record("convert_to_template", success, data if not success else None)
|
||||||
|
is_template = success
|
||||||
|
|
||||||
|
# Deploy from template
|
||||||
|
deployed_vm_name = f"{TEST_VM_NAME}-deployed"
|
||||||
|
if is_template:
|
||||||
|
success, data = await call_tool(session, "deploy_from_template", {
|
||||||
|
"template_name": TEST_VM_NAME,
|
||||||
|
"new_vm_name": deployed_vm_name,
|
||||||
|
"datastore": datastore,
|
||||||
|
})
|
||||||
|
results.record("deploy_from_template", success, data if not success else None)
|
||||||
|
deployed_vm_created = success
|
||||||
|
|
||||||
|
# Clean up deployed VM
|
||||||
|
if deployed_vm_created:
|
||||||
|
await call_tool(session, "delete_vm", {"name": deployed_vm_name})
|
||||||
|
else:
|
||||||
|
results.skip("deploy_from_template", "template conversion failed")
|
||||||
|
|
||||||
|
# Convert template back to VM
|
||||||
|
if is_template:
|
||||||
|
success, data = await call_tool(session, "convert_to_vm", {
|
||||||
|
"template_name": TEST_VM_NAME,
|
||||||
|
})
|
||||||
|
results.record("convert_to_vm", success, data if not success else None)
|
||||||
|
else:
|
||||||
|
results.skip("convert_to_vm", "template conversion failed")
|
||||||
|
|
||||||
|
# ─────────────────────────────────────────────────────────────
|
||||||
|
# CLEANUP
|
||||||
|
# ─────────────────────────────────────────────────────────────
|
||||||
|
print(f"\n{'=' * 60}")
|
||||||
|
print("CLEANUP")
|
||||||
|
print(f"{'=' * 60}")
|
||||||
|
|
||||||
|
if SKIP_CLEANUP:
|
||||||
|
print(f"\n⚠️ Cleanup SKIPPED. Test VM '{TEST_VM_NAME}' remains.")
|
||||||
|
if folder_created:
|
||||||
|
print(f" Test folder '{TEST_FOLDER_NAME}' remains.")
|
||||||
|
else:
|
||||||
|
# Delete test VM
|
||||||
|
print(f"\n>>> Deleting test VM: {TEST_VM_NAME}")
|
||||||
|
success, data = await call_tool(session, "delete_vm", {"name": TEST_VM_NAME})
|
||||||
|
results.record("delete_vm (cleanup)", success, data if not success else None)
|
||||||
|
|
||||||
|
# Note: Folder deletion would require empty folder
|
||||||
|
# In a real scenario, you'd need to handle this
|
||||||
|
if folder_created:
|
||||||
|
print(f" Note: Test folder '{TEST_FOLDER_NAME}' may need manual cleanup")
|
||||||
|
|
||||||
|
# Print summary
|
||||||
|
return results.summary()
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
success = asyncio.run(main())
|
||||||
|
sys.exit(0 if success else 1)
|
||||||
Loading…
x
Reference in New Issue
Block a user