add vCenter-specific operations mixin

VCenterOpsMixin provides tools that require vCenter Server:

Storage vMotion:
- storage_vmotion: Move VM disks to different datastore
- move_vm_disk: Move specific disk to different datastore

Template Management:
- convert_to_template: Convert VM to template
- convert_to_vm: Convert template back to VM
- deploy_from_template: Deploy new VM from template

Folder Organization:
- list_folders: List VM folders in datacenter
- create_folder: Create new VM folder
- move_vm_to_folder: Move VM to different folder

Tasks & Events:
- list_recent_tasks: List recent vCenter tasks
- list_recent_events: List recent vCenter events

Cluster Operations:
- list_clusters: List clusters with DRS/HA status
- get_drs_recommendations: Get DRS recommendations for cluster

Also fixes empty-list serialization issue in FastMCP by returning
informative messages when results are empty.

Total: 86 tools, 6 resources
This commit is contained in:
Ryan Malloy 2025-12-26 06:03:19 -07:00
parent 9e39c1c678
commit 359167ec6a
4 changed files with 787 additions and 0 deletions

View File

@ -9,6 +9,7 @@ from esxi_mcp_server.mixins.ovf_management import OVFManagementMixin
from esxi_mcp_server.mixins.power_ops import PowerOpsMixin
from esxi_mcp_server.mixins.resources import ResourcesMixin
from esxi_mcp_server.mixins.snapshots import SnapshotsMixin
from esxi_mcp_server.mixins.vcenter_ops import VCenterOpsMixin
from esxi_mcp_server.mixins.vm_lifecycle import VMLifecycleMixin
__all__ = [
@ -22,4 +23,5 @@ __all__ = [
"NICManagementMixin",
"OVFManagementMixin",
"HostManagementMixin",
"VCenterOpsMixin",
]

View File

@ -0,0 +1,697 @@
"""vCenter-specific Operations - Storage vMotion, Templates, Folders, Tasks."""
from datetime import datetime, timedelta
from typing import TYPE_CHECKING, Any
from fastmcp.contrib.mcp_mixin import MCPMixin, mcp_tool
from mcp.types import ToolAnnotations
from pyVmomi import vim
if TYPE_CHECKING:
from esxi_mcp_server.connection import VMwareConnection
class VCenterOpsMixin(MCPMixin):
"""vCenter-specific operations (require vCenter, not just ESXi)."""
def __init__(self, conn: "VMwareConnection"):
self.conn = conn
# ─────────────────────────────────────────────────────────────────────────────
# Storage vMotion (works even on single-host vCenter)
# ─────────────────────────────────────────────────────────────────────────────
@mcp_tool(
name="storage_vmotion",
description="Move a VM's disks to a different datastore (Storage vMotion)",
annotations=ToolAnnotations(destructiveHint=True),
)
def storage_vmotion(
self,
vm_name: str,
target_datastore: str,
thin_provision: bool | None = None,
) -> dict[str, Any]:
"""Move a VM's storage to a different datastore.
This moves all VM files (disks, config) to the target datastore.
VM can be running during the migration.
Args:
vm_name: Name of the VM to migrate
target_datastore: Target datastore name
thin_provision: Convert to thin provisioning (None = keep current)
Returns:
Dict with migration details
"""
vm = self.conn.find_vm(vm_name)
if not vm:
raise ValueError(f"VM '{vm_name}' not found")
ds = self.conn.find_datastore(target_datastore)
if not ds:
raise ValueError(f"Datastore '{target_datastore}' not found")
# Get current datastore
current_ds = vm.config.files.vmPathName.split("]")[0].strip("[")
if current_ds == target_datastore:
return {
"vm": vm_name,
"action": "no_migration_needed",
"message": f"VM is already on datastore '{target_datastore}'",
}
# Create relocate spec
relocate_spec = vim.vm.RelocateSpec()
relocate_spec.datastore = ds
# Set disk provisioning if specified
if thin_provision is not None:
if thin_provision:
relocate_spec.transform = vim.vm.RelocateSpec.Transformation.sparse
else:
relocate_spec.transform = vim.vm.RelocateSpec.Transformation.flat
# Perform the relocation
task = vm.RelocateVM_Task(spec=relocate_spec)
self.conn.wait_for_task(task)
return {
"vm": vm_name,
"action": "storage_vmotion_complete",
"source_datastore": current_ds,
"target_datastore": target_datastore,
"thin_provision": thin_provision,
}
@mcp_tool(
name="move_vm_disk",
description="Move a specific VM disk to a different datastore",
annotations=ToolAnnotations(destructiveHint=True),
)
def move_vm_disk(
self,
vm_name: str,
disk_label: str,
target_datastore: str,
) -> dict[str, Any]:
"""Move a specific VM disk to a different datastore.
Args:
vm_name: Name of the VM
disk_label: Label of the disk (e.g., 'Hard disk 1')
target_datastore: Target datastore name
Returns:
Dict with migration details
"""
vm = self.conn.find_vm(vm_name)
if not vm:
raise ValueError(f"VM '{vm_name}' not found")
ds = self.conn.find_datastore(target_datastore)
if not ds:
raise ValueError(f"Datastore '{target_datastore}' not found")
# Find the specific disk
target_disk = None
for device in vm.config.hardware.device:
if isinstance(device, vim.vm.device.VirtualDisk) and device.deviceInfo.label.lower() == disk_label.lower():
target_disk = device
break
if not target_disk:
available = [
d.deviceInfo.label
for d in vm.config.hardware.device
if isinstance(d, vim.vm.device.VirtualDisk)
]
raise ValueError(f"Disk '{disk_label}' not found. Available: {available}")
# Get current disk location
current_path = target_disk.backing.fileName
current_ds = current_path.split("]")[0].strip("[")
# Create disk locator for this specific disk
disk_locator = vim.vm.RelocateSpec.DiskLocator()
disk_locator.diskId = target_disk.key
disk_locator.datastore = ds
# Create relocate spec with just this disk
relocate_spec = vim.vm.RelocateSpec()
relocate_spec.disk = [disk_locator]
# Perform the relocation
task = vm.RelocateVM_Task(spec=relocate_spec)
self.conn.wait_for_task(task)
return {
"vm": vm_name,
"action": "disk_moved",
"disk": disk_label,
"source_datastore": current_ds,
"target_datastore": target_datastore,
}
# ─────────────────────────────────────────────────────────────────────────────
# Template Management
# ─────────────────────────────────────────────────────────────────────────────
@mcp_tool(
name="convert_to_template",
description="Convert a VM to a template",
annotations=ToolAnnotations(destructiveHint=True),
)
def convert_to_template(self, vm_name: str) -> dict[str, Any]:
"""Convert a VM to a template.
The VM must be powered off. Once converted, it cannot be powered on
until converted back to a VM.
Args:
vm_name: Name of the VM to convert
Returns:
Dict with conversion details
"""
vm = self.conn.find_vm(vm_name)
if not vm:
raise ValueError(f"VM '{vm_name}' not found")
if vm.runtime.powerState != vim.VirtualMachinePowerState.poweredOff:
raise ValueError("VM must be powered off to convert to template")
if vm.config.template:
return {
"vm": vm_name,
"action": "already_template",
"is_template": True,
}
vm.MarkAsTemplate()
return {
"vm": vm_name,
"action": "converted_to_template",
"is_template": True,
}
@mcp_tool(
name="convert_to_vm",
description="Convert a template back to a VM",
annotations=ToolAnnotations(destructiveHint=True),
)
def convert_to_vm(
self,
template_name: str,
resource_pool: str | None = None,
) -> dict[str, Any]:
"""Convert a template back to a regular VM.
Args:
template_name: Name of the template
resource_pool: Resource pool for the VM (optional)
Returns:
Dict with conversion details
"""
vm = self.conn.find_vm(template_name)
if not vm:
raise ValueError(f"Template '{template_name}' not found")
if not vm.config.template:
return {
"vm": template_name,
"action": "already_vm",
"is_template": False,
}
# Get resource pool
if resource_pool:
pool = self._find_resource_pool(resource_pool)
if not pool:
raise ValueError(f"Resource pool '{resource_pool}' not found")
else:
pool = self.conn.resource_pool
# Get a host from the resource pool
host = None
if hasattr(pool, "owner") and hasattr(pool.owner, "host"):
hosts = pool.owner.host
if hosts:
host = hosts[0]
vm.MarkAsVirtualMachine(pool=pool, host=host)
return {
"vm": template_name,
"action": "converted_to_vm",
"is_template": False,
}
def _find_resource_pool(self, name: str) -> vim.ResourcePool | None:
"""Find a resource pool by name."""
container = self.conn.content.viewManager.CreateContainerView(
self.conn.content.rootFolder, [vim.ResourcePool], True
)
try:
for pool in container.view:
if pool.name == name:
return pool
finally:
container.Destroy()
return None
@mcp_tool(
name="deploy_from_template",
description="Deploy a new VM from a template",
annotations=ToolAnnotations(destructiveHint=True),
)
def deploy_from_template(
self,
template_name: str,
new_vm_name: str,
datastore: str | None = None,
power_on: bool = False,
) -> dict[str, Any]:
"""Deploy a new VM from a template.
Args:
template_name: Name of the template to clone
new_vm_name: Name for the new VM
datastore: Target datastore (default: same as template)
power_on: Power on after deployment (default False)
Returns:
Dict with deployment details
"""
template = self.conn.find_vm(template_name)
if not template:
raise ValueError(f"Template '{template_name}' not found")
if not template.config.template:
raise ValueError(f"'{template_name}' is not a template")
# Check if target VM already exists
if self.conn.find_vm(new_vm_name):
raise ValueError(f"VM '{new_vm_name}' already exists")
# Build clone spec
relocate_spec = vim.vm.RelocateSpec()
relocate_spec.pool = self.conn.resource_pool
if datastore:
ds = self.conn.find_datastore(datastore)
if not ds:
raise ValueError(f"Datastore '{datastore}' not found")
relocate_spec.datastore = ds
clone_spec = vim.vm.CloneSpec()
clone_spec.location = relocate_spec
clone_spec.powerOn = power_on
clone_spec.template = False # Create VM, not another template
# Get target folder
folder = self.conn.datacenter.vmFolder
# Clone the template
task = template.Clone(folder=folder, name=new_vm_name, spec=clone_spec)
self.conn.wait_for_task(task)
# Get the new VM info
new_vm = self.conn.find_vm(new_vm_name)
return {
"vm": new_vm_name,
"action": "deployed_from_template",
"template": template_name,
"datastore": datastore or "same as template",
"power_state": str(new_vm.runtime.powerState) if new_vm else "unknown",
}
# ─────────────────────────────────────────────────────────────────────────────
# Folder Organization
# ─────────────────────────────────────────────────────────────────────────────
@mcp_tool(
name="list_folders",
description="List VM folders in the datacenter",
annotations=ToolAnnotations(readOnlyHint=True),
)
def list_folders(self) -> list[dict[str, Any]]:
"""List all VM folders in the datacenter.
Returns:
List of folder details
"""
folders = []
def _collect_folders(folder: vim.Folder, path: str = ""):
current_path = f"{path}/{folder.name}" if path else folder.name
folders.append({
"name": folder.name,
"path": current_path,
"type": "Folder",
"children": len(folder.childEntity) if hasattr(folder, "childEntity") else 0,
})
if hasattr(folder, "childEntity"):
for child in folder.childEntity:
if isinstance(child, vim.Folder):
_collect_folders(child, current_path)
# Start from VM folder
vm_folder = self.conn.datacenter.vmFolder
_collect_folders(vm_folder)
return folders
@mcp_tool(
name="create_folder",
description="Create a new VM folder",
annotations=ToolAnnotations(destructiveHint=True),
)
def create_folder(
self,
folder_name: str,
parent_path: str | None = None,
) -> dict[str, Any]:
"""Create a new VM folder.
Args:
folder_name: Name for the new folder
parent_path: Path to parent folder (None = root vm folder)
Returns:
Dict with folder details
"""
if parent_path:
parent = self._find_folder_by_path(parent_path)
if not parent:
raise ValueError(f"Parent folder '{parent_path}' not found")
else:
parent = self.conn.datacenter.vmFolder
parent.CreateFolder(name=folder_name)
return {
"action": "folder_created",
"name": folder_name,
"parent": parent_path or "vm (root)",
"path": f"{parent_path}/{folder_name}" if parent_path else f"vm/{folder_name}",
}
def _find_folder_by_path(self, path: str) -> vim.Folder | None:
"""Find a folder by its path (e.g., 'vm/Production/WebServers')."""
parts = [p for p in path.split("/") if p and p != "vm"]
current = self.conn.datacenter.vmFolder
for part in parts:
found = None
if hasattr(current, "childEntity"):
for child in current.childEntity:
if isinstance(child, vim.Folder) and child.name == part:
found = child
break
if not found:
return None
current = found
return current
@mcp_tool(
name="move_vm_to_folder",
description="Move a VM to a different folder",
annotations=ToolAnnotations(destructiveHint=True),
)
def move_vm_to_folder(
self,
vm_name: str,
folder_path: str,
) -> dict[str, Any]:
"""Move a VM to a different folder.
Args:
vm_name: Name of the VM to move
folder_path: Path to target folder
Returns:
Dict with move details
"""
vm = self.conn.find_vm(vm_name)
if not vm:
raise ValueError(f"VM '{vm_name}' not found")
folder = self._find_folder_by_path(folder_path)
if not folder:
raise ValueError(f"Folder '{folder_path}' not found")
# Get current folder
current_folder = vm.parent.name if vm.parent else "unknown"
# Move the VM
task = folder.MoveIntoFolder_Task([vm])
self.conn.wait_for_task(task)
return {
"vm": vm_name,
"action": "moved_to_folder",
"from_folder": current_folder,
"to_folder": folder_path,
}
# ─────────────────────────────────────────────────────────────────────────────
# vCenter Tasks and Events
# ─────────────────────────────────────────────────────────────────────────────
@mcp_tool(
name="list_recent_tasks",
description="List recent tasks from vCenter",
annotations=ToolAnnotations(readOnlyHint=True),
)
def list_recent_tasks(
self,
max_count: int = 20,
entity_name: str | None = None,
) -> list[dict[str, Any]]:
"""List recent tasks from vCenter.
Args:
max_count: Maximum number of tasks to return (default 20)
entity_name: Filter by entity name (optional)
Returns:
List of task details
"""
task_manager = self.conn.content.taskManager
recent_tasks = task_manager.recentTask
tasks = []
for task in recent_tasks[:max_count]:
task_info = {
"key": task.info.key,
"name": task.info.name or task.info.descriptionId,
"state": str(task.info.state),
"progress": task.info.progress,
"queued_time": str(task.info.queueTime) if task.info.queueTime else None,
"start_time": str(task.info.startTime) if task.info.startTime else None,
"complete_time": str(task.info.completeTime) if task.info.completeTime else None,
}
# Add entity info if available
if task.info.entity:
task_info["entity"] = task.info.entity.name
task_info["entity_type"] = type(task.info.entity).__name__
# Add error info if failed
if task.info.error:
task_info["error"] = str(task.info.error.msg)
# Filter by entity if specified
if entity_name and task.info.entity and task.info.entity.name != entity_name:
continue
tasks.append(task_info)
# Ensure we return something even if empty
if not tasks:
return [{"message": "No recent tasks found", "count": 0}]
return tasks
@mcp_tool(
name="list_recent_events",
description="List recent events from vCenter",
annotations=ToolAnnotations(readOnlyHint=True),
)
def list_recent_events(
self,
max_count: int = 50,
event_types: list[str] | None = None,
hours_back: int = 24,
) -> list[dict[str, Any]]:
"""List recent events from vCenter.
Args:
max_count: Maximum number of events (default 50)
event_types: Filter by event type names (optional)
hours_back: How many hours back to look (default 24)
Returns:
List of event details
"""
event_manager = self.conn.content.eventManager
# Create filter spec
filter_spec = vim.event.EventFilterSpec()
filter_spec.time = vim.event.EventFilterSpec.ByTime()
filter_spec.time.beginTime = datetime.now() - timedelta(hours=hours_back)
# Get events
event_collector = event_manager.CreateCollectorForEvents(filter=filter_spec)
try:
events = event_collector.ReadNextEvents(max_count)
result = []
for event in events:
event_info = {
"key": event.key,
"type": type(event).__name__,
"created_time": str(event.createdTime),
"message": event.fullFormattedMessage,
"user": event.userName if hasattr(event, "userName") else None,
}
# Add entity info if available
if hasattr(event, "vm") and event.vm:
event_info["vm"] = event.vm.name
if hasattr(event, "host") and event.host:
event_info["host"] = event.host.name
# Filter by type if specified
if event_types and type(event).__name__ not in event_types:
continue
result.append(event_info)
# Ensure we return something even if empty
if not result:
return [{"message": f"No events found in the last {hours_back} hours", "count": 0}]
return result
finally:
event_collector.DestroyCollector()
# ─────────────────────────────────────────────────────────────────────────────
# Cluster Operations (for multi-host environments)
# ─────────────────────────────────────────────────────────────────────────────
@mcp_tool(
name="list_clusters",
description="List all clusters in the datacenter",
annotations=ToolAnnotations(readOnlyHint=True),
)
def list_clusters(self) -> list[dict[str, Any]]:
"""List all clusters in the datacenter.
Returns:
List of cluster details with DRS/HA status
"""
clusters = []
for entity in self.conn.datacenter.hostFolder.childEntity:
if isinstance(entity, vim.ClusterComputeResource):
drs_config = entity.configuration.drsConfig
ha_config = entity.configuration.dasConfig
clusters.append({
"name": entity.name,
"host_count": len(entity.host) if entity.host else 0,
"total_cpu_mhz": entity.summary.totalCpu,
"total_memory_gb": round(entity.summary.totalMemory / (1024**3), 2),
"effective_cpu_mhz": entity.summary.effectiveCpu,
"effective_memory_gb": round(entity.summary.effectiveMemory / 1024, 2),
"drs": {
"enabled": drs_config.enabled if drs_config else False,
"behavior": str(drs_config.defaultVmBehavior) if drs_config else None,
},
"ha": {
"enabled": ha_config.enabled if ha_config else False,
"admission_control": ha_config.admissionControlEnabled if ha_config else False,
},
})
# Return informative message if no clusters found (standalone host mode)
if not clusters:
return [{
"message": "No clusters found - this appears to be a standalone host or non-clustered environment",
"count": 0,
}]
return clusters
@mcp_tool(
name="get_drs_recommendations",
description="Get DRS recommendations for a cluster",
annotations=ToolAnnotations(readOnlyHint=True),
)
def get_drs_recommendations(
self,
cluster_name: str,
) -> list[dict[str, Any]]:
"""Get DRS recommendations for a cluster.
Args:
cluster_name: Name of the cluster
Returns:
List of DRS recommendations
"""
cluster = self._find_cluster(cluster_name)
if not cluster:
raise ValueError(f"Cluster '{cluster_name}' not found")
if not cluster.configuration.drsConfig.enabled:
return [{
"message": "DRS is not enabled for this cluster",
"cluster": cluster_name,
}]
recommendations = []
if hasattr(cluster, "recommendation") and cluster.recommendation:
for rec in cluster.recommendation:
rec_info = {
"key": rec.key,
"reason": rec.reason,
"rating": rec.rating,
"type": rec.reasonText,
}
# Add action details
if rec.action:
rec_info["actions"] = []
for action in rec.action:
if hasattr(action, "target"):
rec_info["actions"].append({
"type": type(action).__name__,
"target": action.target.name if action.target else "Unknown",
})
recommendations.append(rec_info)
if not recommendations:
return [{
"message": "No DRS recommendations at this time",
"cluster": cluster_name,
}]
return recommendations
def _find_cluster(self, name: str) -> vim.ClusterComputeResource | None:
"""Find a cluster by name."""
for entity in self.conn.datacenter.hostFolder.childEntity:
if isinstance(entity, vim.ClusterComputeResource) and entity.name == name:
return entity
return None

View File

@ -18,6 +18,7 @@ from esxi_mcp_server.mixins import (
PowerOpsMixin,
ResourcesMixin,
SnapshotsMixin,
VCenterOpsMixin,
VMLifecycleMixin,
)
@ -76,6 +77,7 @@ def create_server(settings: Settings | None = None) -> FastMCP:
NICManagementMixin(conn),
OVFManagementMixin(conn),
HostManagementMixin(conn),
VCenterOpsMixin(conn),
]
tool_count = 0

View File

@ -207,6 +207,92 @@ async def main():
print(f"Error: {e}")
print()
# ─────────────────────────────────────────────────────────────────────────
# vCenter-Specific Tests (NEW!)
# ─────────────────────────────────────────────────────────────────────────
# Test 8: List Folders
print("=== Test 8: List Folders (vCenter) ===")
try:
result = await session.call_tool("list_folders", {})
folders = json.loads(result.content[0].text)
print(f"Found {len(folders)} folders:")
for f in folders[:10]:
print(f" 📁 {f['path']} ({f.get('children', 0)} children)")
except Exception as e:
print(f"Error: {e}")
print()
# Test 9: List Clusters
print("=== Test 9: List Clusters (vCenter) ===")
try:
result = await session.call_tool("list_clusters", {})
clusters = json.loads(result.content[0].text)
# Check if it's an empty-result message
if clusters and 'message' in clusters[0]:
print(f" {clusters[0]['message']}")
elif clusters:
print(f"Found {len(clusters)} clusters:")
for c in clusters:
print(f" 🖥️ {c['name']}: {c.get('host_count', 0)} hosts")
print(f" DRS: {c.get('drs', {}).get('enabled', 'N/A')}, HA: {c.get('ha', {}).get('enabled', 'N/A')}")
else:
print(" No clusters found (standalone host mode)")
except Exception as e:
print(f"Error: {e}")
print()
# Test 10: List Recent Tasks
print("=== Test 10: Recent Tasks (vCenter) ===")
try:
result = await session.call_tool("list_recent_tasks", {"max_count": 5})
tasks = json.loads(result.content[0].text)
# Check if it's an empty-result message
if tasks and 'message' in tasks[0]:
print(f" {tasks[0]['message']}")
else:
print(f"Found {len(tasks)} recent tasks:")
for t in tasks[:5]:
entity = t.get('entity', 'N/A')
print(f" 📋 {t.get('name', 'N/A')} - {t.get('state', 'N/A')}")
print(f" Entity: {entity} | Started: {t.get('start_time', 'N/A')}")
except Exception as e:
print(f"Error: {e}")
print()
# Test 11: Get Alarms
print("=== Test 11: Get Alarms (vCenter) ===")
try:
result = await session.call_tool("get_alarms", {})
alarms = json.loads(result.content[0].text)
if alarms:
print(f"Found {len(alarms)} active alarms:")
for a in alarms[:5]:
print(f" ⚠️ {a.get('alarm', 'N/A')} on {a.get('entity', 'N/A')}")
print(f" Status: {a.get('status', 'N/A')}, Acknowledged: {a.get('acknowledged', 'N/A')}")
else:
print(" No active alarms (all clear!)")
except Exception as e:
print(f"Error: {e}")
print()
# Test 12: List Recent Events
print("=== Test 12: Recent Events (vCenter) ===")
try:
result = await session.call_tool("list_recent_events", {"max_count": 5, "hours_back": 24})
events = json.loads(result.content[0].text)
print(f"Found {len(events)} events in last 24h:")
for e in events[:5]:
vm_info = f" (VM: {e['vm']})" if e.get('vm') else ""
print(f" 📌 {e.get('type', 'N/A')}{vm_info}")
msg = e.get('message', 'N/A')
if len(msg) > 80:
msg = msg[:77] + "..."
print(f" {msg}")
except Exception as e:
print(f"Error: {e}")
print()
print("✅ All tests completed!")