721 lines
33 KiB
Python
721 lines
33 KiB
Python
import os
|
|
import json
|
|
import logging
|
|
import ssl
|
|
import argparse
|
|
from dataclasses import dataclass
|
|
from typing import Optional, Dict, Any
|
|
|
|
# MCP protocol related imports
|
|
from mcp.server.lowlevel import Server # MCP server base class
|
|
from mcp.server.sse import SseServerTransport # SSE transport support
|
|
from mcp import types # MCP type definitions
|
|
|
|
# pyVmomi VMware API imports
|
|
from pyVim import connect
|
|
from pyVmomi import vim, vmodl
|
|
|
|
# Configuration data class for storing configuration options
|
|
@dataclass
|
|
class Config:
|
|
vcenter_host: str
|
|
vcenter_user: str
|
|
vcenter_password: str
|
|
datacenter: Optional[str] = None # Datacenter name (optional)
|
|
cluster: Optional[str] = None # Cluster name (optional)
|
|
datastore: Optional[str] = None # Datastore name (optional)
|
|
network: Optional[str] = None # Virtual network name (optional)
|
|
insecure: bool = False # Whether to skip SSL certificate verification (default: False)
|
|
api_key: Optional[str] = None # API access key for authentication
|
|
log_file: Optional[str] = None # Log file path (if not specified, output to console)
|
|
log_level: str = "INFO" # Log level
|
|
|
|
# VMware management class, encapsulating pyVmomi operations for vSphere
|
|
class VMwareManager:
|
|
def __init__(self, config: Config):
|
|
self.config = config
|
|
self.si = None # Service instance (ServiceInstance)
|
|
self.content = None # vSphere content root
|
|
self.datacenter_obj = None
|
|
self.resource_pool = None
|
|
self.datastore_obj = None
|
|
self.network_obj = None
|
|
self.authenticated = False # Authentication flag for API key verification
|
|
self._connect_vcenter()
|
|
|
|
def _connect_vcenter(self):
|
|
"""Connect to vCenter/ESXi and retrieve main resource object references."""
|
|
try:
|
|
if self.config.insecure:
|
|
# Connection method without SSL certificate verification
|
|
context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
|
|
context.check_hostname = False # Disable hostname checking
|
|
context.verify_mode = ssl.CERT_NONE
|
|
self.si = connect.SmartConnect(
|
|
host=self.config.vcenter_host,
|
|
user=self.config.vcenter_user,
|
|
pwd=self.config.vcenter_password,
|
|
sslContext=context)
|
|
else:
|
|
# Standard SSL verification connection
|
|
self.si = connect.SmartConnect(
|
|
host=self.config.vcenter_host,
|
|
user=self.config.vcenter_user,
|
|
pwd=self.config.vcenter_password)
|
|
except Exception as e:
|
|
logging.error(f"Failed to connect to vCenter/ESXi: {e}")
|
|
raise
|
|
# Retrieve content root object
|
|
self.content = self.si.RetrieveContent()
|
|
logging.info("Successfully connected to VMware vCenter/ESXi API")
|
|
|
|
# Retrieve target datacenter object
|
|
if self.config.datacenter:
|
|
# Find specified datacenter by name
|
|
self.datacenter_obj = next((dc for dc in self.content.rootFolder.childEntity
|
|
if isinstance(dc, vim.Datacenter) and dc.name == self.config.datacenter), None)
|
|
if not self.datacenter_obj:
|
|
logging.error(f"Datacenter named {self.config.datacenter} not found")
|
|
raise Exception(f"Datacenter {self.config.datacenter} not found")
|
|
else:
|
|
# Default to the first available datacenter
|
|
self.datacenter_obj = next((dc for dc in self.content.rootFolder.childEntity
|
|
if isinstance(dc, vim.Datacenter)), None)
|
|
if not self.datacenter_obj:
|
|
raise Exception("No datacenter object found")
|
|
|
|
# Retrieve resource pool (if a cluster is configured, use the cluster's resource pool; otherwise, use the host resource pool)
|
|
compute_resource = None
|
|
if self.config.cluster:
|
|
# Find specified cluster
|
|
for folder in self.datacenter_obj.hostFolder.childEntity:
|
|
if isinstance(folder, vim.ClusterComputeResource) and folder.name == self.config.cluster:
|
|
compute_resource = folder
|
|
break
|
|
if not compute_resource:
|
|
logging.error(f"Cluster named {self.config.cluster} not found")
|
|
raise Exception(f"Cluster {self.config.cluster} not found")
|
|
else:
|
|
# Default to the first ComputeResource (cluster or standalone host)
|
|
compute_resource = next((cr for cr in self.datacenter_obj.hostFolder.childEntity
|
|
if isinstance(cr, vim.ComputeResource)), None)
|
|
if not compute_resource:
|
|
raise Exception("No compute resource (cluster or host) found")
|
|
self.resource_pool = compute_resource.resourcePool
|
|
logging.info(f"Using resource pool: {self.resource_pool.name}")
|
|
|
|
# Retrieve datastore object
|
|
if self.config.datastore:
|
|
# Find specified datastore in the datacenter
|
|
self.datastore_obj = next((ds for ds in self.datacenter_obj.datastoreFolder.childEntity
|
|
if isinstance(ds, vim.Datastore) and ds.name == self.config.datastore), None)
|
|
if not self.datastore_obj:
|
|
logging.error(f"Datastore named {self.config.datastore} not found")
|
|
raise Exception(f"Datastore {self.config.datastore} not found")
|
|
else:
|
|
# Default to the datastore with the largest available capacity
|
|
datastores = [ds for ds in self.datacenter_obj.datastoreFolder.childEntity if isinstance(ds, vim.Datastore)]
|
|
if not datastores:
|
|
raise Exception("No available datastore found in the datacenter")
|
|
# Select the one with the maximum free space
|
|
self.datastore_obj = max(datastores, key=lambda ds: ds.summary.freeSpace)
|
|
logging.info(f"Using datastore: {self.datastore_obj.name}")
|
|
|
|
# Retrieve network object (network or distributed virtual portgroup)
|
|
if self.config.network:
|
|
# Find specified network in the datacenter network list
|
|
networks = self.datacenter_obj.networkFolder.childEntity
|
|
self.network_obj = next((net for net in networks if net.name == self.config.network), None)
|
|
if not self.network_obj:
|
|
logging.error(f"Network {self.config.network} not found")
|
|
raise Exception(f"Network {self.config.network} not found")
|
|
logging.info(f"Using network: {self.network_obj.name}")
|
|
else:
|
|
self.network_obj = None # If no network is specified, VM creation can choose to not connect to a network
|
|
|
|
def list_vms(self) -> list:
|
|
"""List all virtual machine names."""
|
|
vm_list = []
|
|
# Create a view to iterate over all virtual machines
|
|
container = self.content.viewManager.CreateContainerView(self.content.rootFolder, [vim.VirtualMachine], True)
|
|
for vm in container.view:
|
|
vm_list.append(vm.name)
|
|
container.Destroy()
|
|
return vm_list
|
|
|
|
def find_vm(self, name: str) -> Optional[vim.VirtualMachine]:
|
|
"""Find virtual machine object by name."""
|
|
container = self.content.viewManager.CreateContainerView(self.content.rootFolder, [vim.VirtualMachine], True)
|
|
vm_obj = None
|
|
for vm in container.view:
|
|
if vm.name == name:
|
|
vm_obj = vm
|
|
break
|
|
container.Destroy()
|
|
return vm_obj
|
|
|
|
def get_vm_performance(self, vm_name: str) -> Dict[str, Any]:
|
|
"""Retrieve performance data (CPU, memory, storage, and network) for the specified virtual machine."""
|
|
vm = self.find_vm(vm_name)
|
|
if not vm:
|
|
raise Exception(f"VM {vm_name} not found")
|
|
# CPU and memory usage (obtained from quickStats)
|
|
stats = {}
|
|
qs = vm.summary.quickStats
|
|
stats["cpu_usage"] = qs.overallCpuUsage # MHz
|
|
stats["memory_usage"] = qs.guestMemoryUsage # MB
|
|
# Storage usage (committed storage, in GB)
|
|
committed = vm.summary.storage.committed if vm.summary.storage else 0
|
|
stats["storage_usage"] = round(committed / (1024**3), 2) # Convert to GB
|
|
# Network usage (obtained from host or VM NIC statistics, latest sample)
|
|
# Here we simply obtain the latest performance counter for VM network I/O
|
|
net_bytes_transmitted = 0
|
|
net_bytes_received = 0
|
|
try:
|
|
pm = self.content.perfManager
|
|
# Define performance counter IDs to query: network transmitted and received bytes
|
|
counter_ids = []
|
|
for c in pm.perfCounter:
|
|
counter_full_name = f"{c.groupInfo.key}.{c.nameInfo.key}.{c.rollupType}"
|
|
if counter_full_name in ("net.transmitted.average", "net.received.average"):
|
|
counter_ids.append(c.key)
|
|
if counter_ids:
|
|
query = vim.PerformanceManager.QuerySpec(maxSample=1, entity=vm, metricId=[vim.PerformanceManager.MetricId(counterId=cid, instance="*") for cid in counter_ids])
|
|
stats_res = pm.QueryStats(querySpec=[query])
|
|
for series in stats_res[0].value:
|
|
# Sum data from each network interface
|
|
if series.id.counterId == counter_ids[0]:
|
|
net_bytes_transmitted = sum(series.value)
|
|
elif series.id.counterId == counter_ids[1]:
|
|
net_bytes_received = sum(series.value)
|
|
stats["network_transmit_KBps"] = net_bytes_transmitted
|
|
stats["network_receive_KBps"] = net_bytes_received
|
|
except Exception as e:
|
|
# If obtaining performance counters fails, log the error but do not terminate
|
|
logging.warning(f"Failed to retrieve network performance data: {e}")
|
|
stats["network_transmit_KBps"] = None
|
|
stats["network_receive_KBps"] = None
|
|
return stats
|
|
|
|
def create_vm(self, name: str, cpus: int, memory_mb: int, datastore: Optional[str] = None, network: Optional[str] = None) -> str:
|
|
"""Create a new virtual machine (from scratch, with an empty disk and optional network)."""
|
|
# If a specific datastore or network is provided, update the corresponding object accordingly
|
|
datastore_obj = self.datastore_obj
|
|
network_obj = self.network_obj
|
|
if datastore:
|
|
datastore_obj = next((ds for ds in self.datacenter_obj.datastoreFolder.childEntity
|
|
if isinstance(ds, vim.Datastore) and ds.name == datastore), None)
|
|
if not datastore_obj:
|
|
raise Exception(f"Specified datastore {datastore} not found")
|
|
if network:
|
|
networks = self.datacenter_obj.networkFolder.childEntity
|
|
network_obj = next((net for net in networks if net.name == network), None)
|
|
if not network_obj:
|
|
raise Exception(f"Specified network {network} not found")
|
|
|
|
# Build VM configuration specification
|
|
vm_spec = vim.vm.ConfigSpec(name=name, memoryMB=memory_mb, numCPUs=cpus, guestId="otherGuest") # guestId can be adjusted as needed
|
|
device_specs = []
|
|
|
|
# Add SCSI controller
|
|
controller_spec = vim.vm.device.VirtualDeviceSpec()
|
|
controller_spec.operation = vim.vm.device.VirtualDeviceSpec.Operation.add
|
|
controller_spec.device = vim.vm.device.ParaVirtualSCSIController() # Using ParaVirtual SCSI controller
|
|
controller_spec.device.deviceInfo = vim.Description(label="SCSI Controller", summary="ParaVirtual SCSI Controller")
|
|
controller_spec.device.busNumber = 0
|
|
controller_spec.device.sharedBus = vim.vm.device.VirtualSCSIController.Sharing.noSharing
|
|
# Set a temporary negative key for the controller for later reference
|
|
controller_spec.device.key = -101
|
|
device_specs.append(controller_spec)
|
|
|
|
# Add virtual disk
|
|
disk_spec = vim.vm.device.VirtualDeviceSpec()
|
|
disk_spec.operation = vim.vm.device.VirtualDeviceSpec.Operation.add
|
|
disk_spec.fileOperation = vim.vm.device.VirtualDeviceSpec.FileOperation.create
|
|
disk_spec.device = vim.vm.device.VirtualDisk()
|
|
disk_spec.device.capacityInKB = 1024 * 1024 * 10 # Create a 10GB disk
|
|
disk_spec.device.deviceInfo = vim.Description(label="Hard Disk 1", summary="10 GB disk")
|
|
disk_spec.device.backing = vim.vm.device.VirtualDisk.FlatVer2BackingInfo()
|
|
disk_spec.device.backing.diskMode = "persistent"
|
|
disk_spec.device.backing.thinProvisioned = True # Thin provisioning
|
|
disk_spec.device.backing.datastore = datastore_obj
|
|
# Attach the disk to the previously created controller
|
|
disk_spec.device.controllerKey = controller_spec.device.key
|
|
disk_spec.device.unitNumber = 0
|
|
device_specs.append(disk_spec)
|
|
|
|
# If a network is provided, add a virtual network adapter
|
|
if network_obj:
|
|
nic_spec = vim.vm.device.VirtualDeviceSpec()
|
|
nic_spec.operation = vim.vm.device.VirtualDeviceSpec.Operation.add
|
|
nic_spec.device = vim.vm.device.VirtualVmxnet3() # Using VMXNET3 network adapter
|
|
nic_spec.device.deviceInfo = vim.Description(label="Network Adapter 1", summary=network_obj.name)
|
|
if isinstance(network_obj, vim.Network):
|
|
nic_spec.device.backing = vim.vm.device.VirtualEthernetCard.NetworkBackingInfo(network=network_obj, deviceName=network_obj.name)
|
|
elif isinstance(network_obj, vim.dvs.DistributedVirtualPortgroup):
|
|
# Distributed virtual switch portgroup
|
|
dvs_uuid = network_obj.config.distributedVirtualSwitch.uuid
|
|
port_key = network_obj.key
|
|
nic_spec.device.backing = vim.vm.device.VirtualEthernetCard.DistributedVirtualPortBackingInfo(
|
|
port=vim.dvs.PortConnection(portgroupKey=port_key, switchUuid=dvs_uuid)
|
|
)
|
|
nic_spec.device.connectable = vim.vm.device.VirtualDevice.ConnectInfo(startConnected=True, allowGuestControl=True)
|
|
device_specs.append(nic_spec)
|
|
|
|
vm_spec.deviceChange = device_specs
|
|
|
|
# Get the folder in which to place the VM (default is the datacenter's vmFolder)
|
|
vm_folder = self.datacenter_obj.vmFolder
|
|
# Create the VM in the specified resource pool
|
|
try:
|
|
task = vm_folder.CreateVM_Task(config=vm_spec, pool=self.resource_pool)
|
|
# Wait for the task to complete
|
|
while task.info.state not in [vim.TaskInfo.State.success, vim.TaskInfo.State.error]:
|
|
continue
|
|
if task.info.state == vim.TaskInfo.State.error:
|
|
raise task.info.error
|
|
except Exception as e:
|
|
logging.error(f"Failed to create virtual machine: {e}")
|
|
raise
|
|
logging.info(f"Virtual machine created: {name}")
|
|
return f"VM '{name}' created."
|
|
|
|
def clone_vm(self, template_name: str, new_name: str) -> str:
|
|
"""Clone a new virtual machine from an existing template or VM."""
|
|
template_vm = self.find_vm(template_name)
|
|
if not template_vm:
|
|
raise Exception(f"Template virtual machine {template_name} not found")
|
|
vm_folder = template_vm.parent # Place the new VM in the same folder as the template
|
|
if not isinstance(vm_folder, vim.Folder):
|
|
vm_folder = self.datacenter_obj.vmFolder
|
|
# Use the resource pool of the host/cluster where the template is located
|
|
resource_pool = template_vm.resourcePool or self.resource_pool
|
|
relocate_spec = vim.vm.RelocateSpec(pool=resource_pool, datastore=self.datastore_obj)
|
|
clone_spec = vim.vm.CloneSpec(powerOn=False, template=False, location=relocate_spec)
|
|
try:
|
|
task = template_vm.Clone(folder=vm_folder, name=new_name, spec=clone_spec)
|
|
while task.info.state not in [vim.TaskInfo.State.success, vim.TaskInfo.State.error]:
|
|
continue
|
|
if task.info.state == vim.TaskInfo.State.error:
|
|
raise task.info.error
|
|
except Exception as e:
|
|
logging.error(f"Failed to clone virtual machine: {e}")
|
|
raise
|
|
logging.info(f"Cloned virtual machine {template_name} to new VM: {new_name}")
|
|
return f"VM '{new_name}' cloned from '{template_name}'."
|
|
|
|
def delete_vm(self, name: str) -> str:
|
|
"""Delete the specified virtual machine."""
|
|
vm = self.find_vm(name)
|
|
if not vm:
|
|
raise Exception(f"Virtual machine {name} not found")
|
|
try:
|
|
task = vm.Destroy_Task()
|
|
while task.info.state not in [vim.TaskInfo.State.success, vim.TaskInfo.State.error]:
|
|
continue
|
|
if task.info.state == vim.TaskInfo.State.error:
|
|
raise task.info.error
|
|
except Exception as e:
|
|
logging.error(f"Failed to delete virtual machine: {e}")
|
|
raise
|
|
logging.info(f"Virtual machine deleted: {name}")
|
|
return f"VM '{name}' deleted."
|
|
|
|
def power_on_vm(self, name: str) -> str:
|
|
"""Power on the specified virtual machine."""
|
|
vm = self.find_vm(name)
|
|
if not vm:
|
|
raise Exception(f"Virtual machine {name} not found")
|
|
if vm.runtime.powerState == vim.VirtualMachine.PowerState.poweredOn:
|
|
return f"VM '{name}' is already powered on."
|
|
task = vm.PowerOnVM_Task()
|
|
while task.info.state not in [vim.TaskInfo.State.success, vim.TaskInfo.State.error]:
|
|
continue
|
|
if task.info.state == vim.TaskInfo.State.error:
|
|
raise task.info.error
|
|
logging.info(f"Virtual machine powered on: {name}")
|
|
return f"VM '{name}' powered on."
|
|
|
|
def power_off_vm(self, name: str) -> str:
|
|
"""Power off the specified virtual machine."""
|
|
vm = self.find_vm(name)
|
|
if not vm:
|
|
raise Exception(f"Virtual machine {name} not found")
|
|
if vm.runtime.powerState == vim.VirtualMachine.PowerState.poweredOff:
|
|
return f"VM '{name}' is already powered off."
|
|
task = vm.PowerOffVM_Task()
|
|
while task.info.state not in [vim.TaskInfo.State.success, vim.TaskInfo.State.error]:
|
|
continue
|
|
if task.info.state == vim.TaskInfo.State.error:
|
|
raise task.info.error
|
|
logging.info(f"Virtual machine powered off: {name}")
|
|
return f"VM '{name}' powered off."
|
|
|
|
# ---------------- MCP Server Definition ----------------
|
|
|
|
# Initialize MCP Server object
|
|
mcp_server = Server(name="VMware-MCP-Server", version="0.0.1")
|
|
# Define supported tools (executable operations) and resources (data interfaces)
|
|
# The implementation of tools and resources will call methods in VMwareManager
|
|
# Note: For each operation, perform API key authentication check, and only execute sensitive operations if the authenticated flag is True
|
|
# If not authenticated, an exception is raised
|
|
|
|
# Tool 1: Authentication (via API Key)
|
|
def tool_authenticate(key: str) -> str:
|
|
"""Validate the API key and enable subsequent operations upon success."""
|
|
if config.api_key and key == config.api_key:
|
|
manager.authenticated = True
|
|
logging.info("API key verification successful, client is authorized")
|
|
return "Authentication successful."
|
|
else:
|
|
logging.warning("API key verification failed")
|
|
raise Exception("Authentication failed: invalid API key.")
|
|
|
|
# Tool 2: Create virtual machine
|
|
def tool_create_vm(name: str, cpu: int, memory: int, datastore: str = None, network: str = None) -> str:
|
|
"""Create a new virtual machine."""
|
|
_check_auth() # Check access permissions
|
|
return manager.create_vm(name, cpu, memory, datastore, network)
|
|
|
|
# Tool 3: Clone virtual machine
|
|
def tool_clone_vm(template_name: str, new_name: str) -> str:
|
|
"""Clone a virtual machine from a template."""
|
|
_check_auth()
|
|
return manager.clone_vm(template_name, new_name)
|
|
|
|
# Tool 4: Delete virtual machine
|
|
def tool_delete_vm(name: str) -> str:
|
|
"""Delete the specified virtual machine."""
|
|
_check_auth()
|
|
return manager.delete_vm(name)
|
|
|
|
# Tool 5: Power on virtual machine
|
|
def tool_power_on(name: str) -> str:
|
|
"""Power on the specified virtual machine."""
|
|
_check_auth()
|
|
return manager.power_on_vm(name)
|
|
|
|
# Tool 6: Power off virtual machine
|
|
def tool_power_off(name: str) -> str:
|
|
"""Power off the specified virtual machine."""
|
|
_check_auth()
|
|
return manager.power_off_vm(name)
|
|
|
|
# Tool 7: List all virtual machines
|
|
def tool_list_vms() -> list:
|
|
"""Return a list of all virtual machine names."""
|
|
_check_auth()
|
|
return manager.list_vms()
|
|
|
|
# Resource 1: Retrieve virtual machine performance data
|
|
def resource_vm_performance(vm_name: str) -> dict:
|
|
"""Retrieve CPU, memory, storage, and network usage for the specified virtual machine."""
|
|
_check_auth()
|
|
return manager.get_vm_performance(vm_name)
|
|
|
|
# Internal helper: Check API access permissions
|
|
def _check_auth():
|
|
if config.api_key:
|
|
# If an API key is configured, require that manager.authenticated is True
|
|
if not manager.authenticated:
|
|
raise Exception("Unauthorized: API key required.")
|
|
|
|
# Register the above functions as tools and resources for the MCP Server
|
|
# Encapsulate using mcp.types.Tool and mcp.types.Resource
|
|
tools = {
|
|
"authenticate": types.Tool(
|
|
name="authenticate",
|
|
description="Authenticate using API key to enable privileged operations",
|
|
parameters={"key": str},
|
|
handler=lambda params: tool_authenticate(**params),
|
|
inputSchema={"type": "object", "properties": {"key": {"type": "string"}}, "required": ["key"]}
|
|
),
|
|
"createVM": types.Tool(
|
|
name="createVM",
|
|
description="Create a new virtual machine",
|
|
parameters={"name": str, "cpu": int, "memory": int, "datastore": Optional[str], "network": Optional[str]},
|
|
handler=lambda params: tool_create_vm(**params),
|
|
inputSchema={
|
|
"type": "object",
|
|
"properties": {
|
|
"name": {"type": "string"},
|
|
"cpu": {"type": "integer"},
|
|
"memory": {"type": "integer"},
|
|
"datastore": {"type": "string", "nullable": True},
|
|
"network": {"type": "string", "nullable": True}
|
|
},
|
|
"required": ["name", "cpu", "memory"]
|
|
}
|
|
),
|
|
"cloneVM": types.Tool(
|
|
name="cloneVM",
|
|
description="Clone a virtual machine from a template or existing VM",
|
|
parameters={"template_name": str, "new_name": str},
|
|
handler=lambda params: tool_clone_vm(**params),
|
|
inputSchema={
|
|
"type": "object",
|
|
"properties": {
|
|
"template_name": {"type": "string"},
|
|
"new_name": {"type": "string"}
|
|
},
|
|
"required": ["template_name", "new_name"]
|
|
}
|
|
),
|
|
"deleteVM": types.Tool(
|
|
name="deleteVM",
|
|
description="Delete a virtual machine",
|
|
parameters={"name": str},
|
|
handler=lambda params: tool_delete_vm(**params),
|
|
inputSchema={
|
|
"type": "object",
|
|
"properties": {"name": {"type": "string"}},
|
|
"required": ["name"]
|
|
}
|
|
),
|
|
"powerOn": types.Tool(
|
|
name="powerOn",
|
|
description="Power on a virtual machine",
|
|
parameters={"name": str},
|
|
handler=lambda params: tool_power_on(**params),
|
|
inputSchema={
|
|
"type": "object",
|
|
"properties": {"name": {"type": "string"}},
|
|
"required": ["name"]
|
|
}
|
|
),
|
|
"powerOff": types.Tool(
|
|
name="powerOff",
|
|
description="Power off a virtual machine",
|
|
parameters={"name": str},
|
|
handler=lambda params: tool_power_off(**params),
|
|
inputSchema={
|
|
"type": "object",
|
|
"properties": {"name": {"type": "string"}},
|
|
"required": ["name"]
|
|
}
|
|
),
|
|
"listVMs": types.Tool(
|
|
name="listVMs",
|
|
description="List all virtual machines",
|
|
parameters={},
|
|
handler=lambda params: tool_list_vms(),
|
|
inputSchema={"type": "object", "properties": {}}
|
|
)
|
|
}
|
|
resources = {
|
|
"vmStats": types.Resource(
|
|
name="vmStats",
|
|
uri="vmstats://{vm_name}",
|
|
description="Get CPU, memory, storage, network usage of a VM",
|
|
parameters={"vm_name": str},
|
|
handler=lambda params: resource_vm_performance(**params),
|
|
inputSchema={
|
|
"type": "object",
|
|
"properties": {
|
|
"vm_name": {"type": "string"}
|
|
},
|
|
"required": ["vm_name"]
|
|
}
|
|
)
|
|
}
|
|
|
|
# Add tools and resources to the MCP Server object
|
|
for name, tool in tools.items():
|
|
setattr(mcp_server, f"tool_{name}", tool)
|
|
for name, res in resources.items():
|
|
setattr(mcp_server, f"resource_{name}", res)
|
|
|
|
# Set the MCP Server capabilities, declaring that the tools and resources list is available
|
|
mcp_server.capabilities = {
|
|
"tools": {"listChanged": True},
|
|
"resources": {"listChanged": True}
|
|
}
|
|
|
|
# Maintain a global SSE transport instance for sending events during POST request processing
|
|
active_transport: Optional[SseServerTransport] = None
|
|
|
|
# SSE initialization request handler (HTTP GET /sse)
|
|
async def sse_endpoint(scope, receive, send):
|
|
"""Handle SSE connection initialization requests. Establish an MCP SSE session."""
|
|
global active_transport
|
|
# Construct response headers to establish an event stream
|
|
headers = [(b"content-type", b"text/event-stream")]
|
|
# Verify API key: Retrieve from request headers 'Authorization' or 'X-API-Key'
|
|
headers_dict = {k.lower().decode(): v.decode() for (k, v) in scope.get("headers", [])}
|
|
provided_key = None
|
|
if b"authorization" in scope["headers"]:
|
|
provided_key = headers_dict.get("authorization")
|
|
elif b"x-api-key" in scope["headers"]:
|
|
provided_key = headers_dict.get("x-api-key")
|
|
if config.api_key and provided_key != f"Bearer {config.api_key}" and provided_key != config.api_key:
|
|
# If the correct API key is not provided, return 401
|
|
res_status = b"401 UNAUTHORIZED"
|
|
await send({"type": "http.response.start", "status": 401, "headers": [(b"content-type", b"text/plain")]})
|
|
await send({"type": "http.response.body", "body": b"Unauthorized"})
|
|
logging.warning("No valid API key provided, rejecting SSE connection")
|
|
return
|
|
|
|
# Establish SSE transport and connect to the MCP Server
|
|
active_transport = SseServerTransport("/sse/messages")
|
|
logging.info("Established new SSE session")
|
|
# Send SSE response headers to the client, preparing to start sending events
|
|
await send({"type": "http.response.start", "status": 200, "headers": headers})
|
|
try:
|
|
async with active_transport.connect_sse(scope, receive, send) as (read_stream, write_stream):
|
|
init_opts = mcp_server.create_initialization_options()
|
|
# Run MCP Server, passing the read/write streams to the Server
|
|
await mcp_server.run(read_stream, write_stream, init_opts)
|
|
except Exception as e:
|
|
logging.error(f"SSE session encountered an error: {e}")
|
|
finally:
|
|
active_transport = None
|
|
# SSE session ended, send an empty message to indicate completion
|
|
await send({"type": "http.response.body", "body": b"", "more_body": False})
|
|
|
|
# JSON-RPC message handler (HTTP POST /sse/messages)
|
|
async def messages_endpoint(scope, receive, send):
|
|
"""Handle JSON-RPC requests sent by the client (via POST)."""
|
|
global active_transport
|
|
# Read request body data
|
|
body_bytes = b''
|
|
more_body = True
|
|
while more_body:
|
|
event = await receive()
|
|
if event["type"] == "http.request":
|
|
body_bytes += event.get("body", b'')
|
|
more_body = event.get("more_body", False)
|
|
# Parse JSON-RPC request
|
|
try:
|
|
body_str = body_bytes.decode('utf-8')
|
|
msg = json.loads(body_str)
|
|
except Exception as e:
|
|
logging.error(f"JSON parsing failed: {e}")
|
|
await send({"type": "http.response.start", "status": 400,
|
|
"headers": [(b"content-type", b"text/plain")]})
|
|
await send({"type": "http.response.body", "body": b"Invalid JSON"})
|
|
return
|
|
|
|
# Only accept requests sent through an established SSE transport
|
|
if not active_transport:
|
|
await send({"type": "http.response.start", "status": 400,
|
|
"headers": [(b"content-type", b"text/plain")]})
|
|
await send({"type": "http.response.body", "body": b"No active session"})
|
|
return
|
|
|
|
# Pass the POST request content to active_transport to trigger the corresponding MCP Server operation
|
|
try:
|
|
# Handle the POST message through SseServerTransport, which injects the request into the MCP session
|
|
await active_transport.handle_post(scope, body_bytes)
|
|
status = 200
|
|
response_body = b""
|
|
except Exception as e:
|
|
logging.error(f"Error handling POST message: {e}")
|
|
status = 500
|
|
response_body = str(e).encode('utf-8')
|
|
# Reply to the client with HTTP status
|
|
await send({"type": "http.response.start", "status": status,
|
|
"headers": [(b"content-type", b"text/plain")]})
|
|
await send({"type": "http.response.body", "body": response_body})
|
|
|
|
# Simple ASGI application routing: dispatch requests to the appropriate handler based on the path and method
|
|
async def app(scope, receive, send):
|
|
if scope["type"] == "http":
|
|
path = scope.get("path", "")
|
|
method = scope.get("method", "").upper()
|
|
if path == "/sse" and method == "GET":
|
|
# SSE initialization request
|
|
await sse_endpoint(scope, receive, send)
|
|
elif path == "/sse/messages" and method in ("POST", "OPTIONS"):
|
|
# JSON-RPC message request; handle CORS preflight OPTIONS request
|
|
if method == "OPTIONS":
|
|
# Return allowed methods
|
|
headers = [
|
|
(b"access-control-allow-methods", b"POST, OPTIONS"),
|
|
(b"access-control-allow-headers", b"Content-Type, Authorization, X-API-Key"),
|
|
(b"access-control-allow-origin", b"*")
|
|
]
|
|
await send({"type": "http.response.start", "status": 204, "headers": headers})
|
|
await send({"type": "http.response.body", "body": b""})
|
|
else:
|
|
await messages_endpoint(scope, receive, send)
|
|
else:
|
|
# Route not found
|
|
await send({"type": "http.response.start", "status": 404,
|
|
"headers": [(b"content-type", b"text/plain")]})
|
|
await send({"type": "http.response.body", "body": b"Not Found"})
|
|
else:
|
|
# Non-HTTP event, do not process
|
|
return
|
|
|
|
# Parse command-line arguments and environment variables, and load configuration
|
|
parser = argparse.ArgumentParser(description="MCP VMware ESXi Management Server")
|
|
parser.add_argument("--config", "-c", help="Configuration file path (JSON or YAML)", default=None)
|
|
args = parser.parse_args()
|
|
|
|
# Attempt to load configuration from a file or environment variables
|
|
config_data = {}
|
|
config_path = args.config or os.environ.get("MCP_CONFIG_FILE")
|
|
if config_path:
|
|
# Parse JSON or YAML based on the file extension
|
|
if config_path.endswith((".yml", ".yaml")):
|
|
import yaml
|
|
with open(config_path, 'r') as f:
|
|
config_data = yaml.safe_load(f)
|
|
elif config_path.endswith(".json"):
|
|
with open(config_path, 'r') as f:
|
|
config_data = json.load(f)
|
|
else:
|
|
raise ValueError("Unsupported configuration file format. Please use JSON or YAML")
|
|
# Override configuration from environment variables (higher priority than file)
|
|
env_map = {
|
|
"VCENTER_HOST": "vcenter_host",
|
|
"VCENTER_USER": "vcenter_user",
|
|
"VCENTER_PASSWORD": "vcenter_password",
|
|
"VCENTER_DATACENTER": "datacenter",
|
|
"VCENTER_CLUSTER": "cluster",
|
|
"VCENTER_DATASTORE": "datastore",
|
|
"VCENTER_NETWORK": "network",
|
|
"VCENTER_INSECURE": "insecure",
|
|
"MCP_API_KEY": "api_key",
|
|
"MCP_LOG_FILE": "log_file",
|
|
"MCP_LOG_LEVEL": "log_level"
|
|
}
|
|
for env_key, cfg_key in env_map.items():
|
|
if env_key in os.environ:
|
|
val = os.environ[env_key]
|
|
# Boolean type conversion
|
|
if cfg_key == "insecure":
|
|
config_data[cfg_key] = val.lower() in ("1", "true", "yes")
|
|
else:
|
|
config_data[cfg_key] = val
|
|
|
|
# Construct Config object from config_data
|
|
required_keys = ["vcenter_host", "vcenter_user", "vcenter_password"]
|
|
for k in required_keys:
|
|
if k not in config_data or not config_data[k]:
|
|
raise Exception(f"Missing required configuration item: {k}")
|
|
config = Config(**config_data)
|
|
|
|
# Initialize logging
|
|
log_level = getattr(logging, config.log_level.upper(), logging.INFO)
|
|
logging.basicConfig(level=log_level,
|
|
format="%(asctime)s [%(levelname)s] %(message)s",
|
|
filename=config.log_file if config.log_file else None)
|
|
if not config.log_file:
|
|
# If no log file is specified, output logs to the console
|
|
logging.getLogger().addHandler(logging.StreamHandler())
|
|
|
|
logging.info("Starting VMware ESXi Management MCP Server...")
|
|
# Create VMware Manager instance and connect
|
|
manager = VMwareManager(config)
|
|
|
|
# If an API key is configured, prompt that authentication is required before invoking sensitive operations
|
|
if config.api_key:
|
|
logging.info("API key authentication is enabled. Clients must call the authenticate tool to verify the key before invoking sensitive operations")
|
|
|
|
# Start ASGI server to listen for MCP SSE connections
|
|
if __name__ == "__main__":
|
|
# Start ASGI application using the built-in uvicorn server (listening on 0.0.0.0:8080)
|
|
import uvicorn
|
|
uvicorn.run(app, host="0.0.0.0", port=8080)
|