Make all tools host-aware: per-call host selection across every mixin

Rolls the host: str | None = None selector and conn = self._conn(host)
routing across all remaining tools in the 12 mixins (the inventory tools
landed earlier). 97 of 98 tools now accept an optional host to target a
specific managed ESXi server; list_servers is the only exception (it is
the registry itself).

Private helpers that touched self.conn (e.g. ovf NFC/datastore helpers,
nic _get_network_backing, vcenter_ops folder/pool/cluster finders,
host_management _get_host, resources datastore browse/stream) now take a
conn parameter threaded from their tool call sites. Several tools had a
local 'host' variable renamed to avoid shadowing the new selector.

Verified: full server builds (98 tools), and get_host_info/list_hosts
route correctly to two distinct live hosts (205 vs 222).
This commit is contained in:
Ryan Malloy 2026-06-08 05:47:04 -06:00
parent 2930318125
commit 9c02d7238c
13 changed files with 672 additions and 349 deletions

View File

@ -25,7 +25,8 @@ class ConsoleMixin(VSphereMixin):
annotations=ToolAnnotations(readOnlyHint=True),
)
def wait_for_vm_tools(
self, name: str, timeout: int = 120, poll_interval: int = 5
self, name: str, timeout: int = 120, poll_interval: int = 5,
host: str | None = None
) -> dict[str, Any]:
"""Wait for VMware Tools to become available.
@ -33,11 +34,13 @@ class ConsoleMixin(VSphereMixin):
name: VM name
timeout: Maximum seconds to wait (default: 120)
poll_interval: Seconds between status checks (default: 5)
host: Managed ESXi host to target (default: the default host)
Returns:
Dict with tools status, version, and guest info when ready
"""
vm = self.conn.find_vm(name)
conn = self._conn(host)
vm = conn.find_vm(name)
if not vm:
raise ValueError(f"VM '{name}' not found")
@ -76,16 +79,18 @@ class ConsoleMixin(VSphereMixin):
description="Get current VMware Tools status for a VM",
annotations=ToolAnnotations(readOnlyHint=True),
)
def get_vm_tools_status(self, name: str) -> dict[str, Any]:
def get_vm_tools_status(self, name: str, host: str | None = None) -> dict[str, Any]:
"""Get VMware Tools status without waiting.
Args:
name: VM name
host: Managed ESXi host to target (default: the default host)
Returns:
Dict with current tools status and guest info
"""
vm = self.conn.find_vm(name)
conn = self._conn(host)
vm = conn.find_vm(name)
if not vm:
raise ValueError(f"VM '{name}' not found")
@ -115,6 +120,7 @@ class ConsoleMixin(VSphereMixin):
name: str,
width: int | None = None,
height: int | None = None,
host: str | None = None,
) -> dict[str, Any]:
"""Capture VM console screenshot via vSphere HTTP API.
@ -122,19 +128,21 @@ class ConsoleMixin(VSphereMixin):
name: VM name
width: Optional width to scale the image
height: Optional height to scale the image
host: Managed ESXi host to target (default: the default host)
Returns:
Dict with base64-encoded image data and metadata
"""
vm = self.conn.find_vm(name)
conn = self._conn(host)
vm = conn.find_vm(name)
if not vm:
raise ValueError(f"VM '{name}' not found")
# Build screenshot URL
# Format: https://{host}/screen?id={moid}
host = self.conn.settings.vcenter_host
vcenter_host = conn.settings.vcenter_host
moid = vm._moId
screenshot_url = f"https://{host}/screen?id={moid}"
screenshot_url = f"https://{vcenter_host}/screen?id={moid}"
# Add optional scaling parameters
params = []
@ -146,8 +154,8 @@ class ConsoleMixin(VSphereMixin):
screenshot_url += "&" + "&".join(params)
# Build auth header
username = self.conn.settings.vcenter_user
password = self.conn.settings.vcenter_password.get_secret_value()
username = conn.settings.vcenter_user
password = conn.settings.vcenter_password.get_secret_value()
auth = base64.b64encode(f"{username}:{password}".encode()).decode("ascii")
# Make request
@ -155,7 +163,7 @@ class ConsoleMixin(VSphereMixin):
response = requests.get(
screenshot_url,
headers={"Authorization": f"Basic {auth}"},
verify=not self.conn.settings.vcenter_insecure,
verify=not conn.settings.vcenter_insecure,
timeout=30,
)
response.raise_for_status()

View File

@ -94,6 +94,7 @@ class DiskManagementMixin(VSphereMixin):
iso_path: str | None = None,
iso_datastore: str | None = None,
boot_from_iso: bool = False,
host: str | None = None,
) -> dict[str, Any]:
"""Add a CD/DVD drive to an existing VM.
@ -105,11 +106,13 @@ class DiskManagementMixin(VSphereMixin):
iso_path: ISO path on a datastore (e.g. 'iso/installer.iso') to mount
iso_datastore: Datastore holding the ISO (default: the VM's datastore)
boot_from_iso: Put the CD/DVD first in the boot order (for installers)
host: Managed ESXi host to target (default: the default host)
Returns:
Dict with the CD/DVD drive details
"""
vm = self.conn.find_vm(vm_name)
conn = self._conn(host)
vm = conn.find_vm(vm_name)
if not vm:
raise ValueError(f"VM '{vm_name}' not found")
@ -150,7 +153,7 @@ class DiskManagementMixin(VSphereMixin):
)
task = vm.ReconfigVM_Task(spec=config_spec)
self.conn.wait_for_task(task)
conn.wait_for_task(task)
return {
"vm": vm_name,
@ -170,6 +173,7 @@ class DiskManagementMixin(VSphereMixin):
size_gb: int,
thin_provisioned: bool = True,
datastore: str | None = None,
host: str | None = None,
) -> dict[str, Any]:
"""Add a new virtual disk to a VM.
@ -178,11 +182,13 @@ class DiskManagementMixin(VSphereMixin):
size_gb: Size of the new disk in GB
thin_provisioned: Use thin provisioning (default True)
datastore: Datastore for the disk (default: same as VM)
host: Managed ESXi host to target (default: the default host)
Returns:
Dict with new disk details
"""
vm = self.conn.find_vm(vm_name)
conn = self._conn(host)
vm = conn.find_vm(vm_name)
if not vm:
raise ValueError(f"VM '{vm_name}' not found")
@ -191,7 +197,7 @@ class DiskManagementMixin(VSphereMixin):
# Determine datastore
if datastore:
ds = self.conn.find_datastore(datastore)
ds = conn.find_datastore(datastore)
if not ds:
raise ValueError(f"Datastore '{datastore}' not found")
ds_name = datastore
@ -206,7 +212,7 @@ class DiskManagementMixin(VSphereMixin):
backing = vim.vm.device.VirtualDisk.FlatVer2BackingInfo()
backing.diskMode = "persistent"
backing.thinProvisioned = thin_provisioned
backing.datastore = self.conn.find_datastore(ds_name)
backing.datastore = conn.find_datastore(ds_name)
# Create the virtual disk
disk = vim.vm.device.VirtualDisk()
@ -227,7 +233,7 @@ class DiskManagementMixin(VSphereMixin):
# Reconfigure VM
task = vm.ReconfigVM_Task(spec=config_spec)
self.conn.wait_for_task(task)
conn.wait_for_task(task)
return {
"vm": vm_name,
@ -249,6 +255,7 @@ class DiskManagementMixin(VSphereMixin):
vm_name: str,
disk_label: str,
delete_file: bool = False,
host: str | None = None,
) -> dict[str, Any]:
"""Remove a virtual disk from a VM.
@ -256,11 +263,13 @@ class DiskManagementMixin(VSphereMixin):
vm_name: Name of the virtual machine
disk_label: Label of disk to remove (e.g., 'Hard disk 2')
delete_file: Also delete the VMDK file (default False - keep file)
host: Managed ESXi host to target (default: the default host)
Returns:
Dict with removal details
"""
vm = self.conn.find_vm(vm_name)
conn = self._conn(host)
vm = conn.find_vm(vm_name)
if not vm:
raise ValueError(f"VM '{vm_name}' not found")
@ -291,7 +300,7 @@ class DiskManagementMixin(VSphereMixin):
# Reconfigure VM
task = vm.ReconfigVM_Task(spec=config_spec)
self.conn.wait_for_task(task)
conn.wait_for_task(task)
return {
"vm": vm_name,
@ -312,6 +321,7 @@ class DiskManagementMixin(VSphereMixin):
vm_name: str,
disk_label: str,
new_size_gb: int,
host: str | None = None,
) -> dict[str, Any]:
"""Extend a virtual disk to a larger size.
@ -319,11 +329,13 @@ class DiskManagementMixin(VSphereMixin):
vm_name: Name of the virtual machine
disk_label: Label of disk to extend (e.g., 'Hard disk 1')
new_size_gb: New total size in GB (must be larger than current)
host: Managed ESXi host to target (default: the default host)
Returns:
Dict with extension details
"""
vm = self.conn.find_vm(vm_name)
conn = self._conn(host)
vm = conn.find_vm(vm_name)
if not vm:
raise ValueError(f"VM '{vm_name}' not found")
@ -357,7 +369,7 @@ class DiskManagementMixin(VSphereMixin):
# Reconfigure VM
task = vm.ReconfigVM_Task(spec=config_spec)
self.conn.wait_for_task(task)
conn.wait_for_task(task)
return {
"vm": vm_name,
@ -372,16 +384,18 @@ class DiskManagementMixin(VSphereMixin):
description="List all virtual disks attached to a VM",
annotations=ToolAnnotations(readOnlyHint=True),
)
def list_disks(self, vm_name: str) -> list[dict[str, Any]]:
def list_disks(self, vm_name: str, host: str | None = None) -> list[dict[str, Any]]:
"""List all virtual disks attached to a VM.
Args:
vm_name: Name of the virtual machine
host: Managed ESXi host to target (default: the default host)
Returns:
List of disk details
"""
vm = self.conn.find_vm(vm_name)
conn = self._conn(host)
vm = conn.find_vm(vm_name)
if not vm:
raise ValueError(f"VM '{vm_name}' not found")
@ -416,6 +430,7 @@ class DiskManagementMixin(VSphereMixin):
vm_name: str,
iso_path: str,
datastore: str | None = None,
host: str | None = None,
) -> dict[str, Any]:
"""Attach an ISO file to a VM's CD/DVD drive.
@ -423,11 +438,13 @@ class DiskManagementMixin(VSphereMixin):
vm_name: Name of the virtual machine
iso_path: Path to ISO file on datastore (e.g., 'iso/ubuntu.iso')
datastore: Datastore containing the ISO (default: first VM datastore)
host: Managed ESXi host to target (default: the default host)
Returns:
Dict with attachment details
"""
vm = self.conn.find_vm(vm_name)
conn = self._conn(host)
vm = conn.find_vm(vm_name)
if not vm:
raise ValueError(f"VM '{vm_name}' not found")
@ -464,7 +481,7 @@ class DiskManagementMixin(VSphereMixin):
# Reconfigure VM
task = vm.ReconfigVM_Task(spec=config_spec)
self.conn.wait_for_task(task)
conn.wait_for_task(task)
return {
"vm": vm_name,
@ -479,16 +496,18 @@ class DiskManagementMixin(VSphereMixin):
description="Detach/eject ISO from a VM's CD/DVD drive",
annotations=ToolAnnotations(destructiveHint=True),
)
def detach_iso(self, vm_name: str) -> dict[str, Any]:
def detach_iso(self, vm_name: str, host: str | None = None) -> dict[str, Any]:
"""Detach/eject ISO from a VM's CD/DVD drive.
Args:
vm_name: Name of the virtual machine
host: Managed ESXi host to target (default: the default host)
Returns:
Dict with detachment details
"""
vm = self.conn.find_vm(vm_name)
conn = self._conn(host)
vm = conn.find_vm(vm_name)
if not vm:
raise ValueError(f"VM '{vm_name}' not found")
@ -523,7 +542,7 @@ class DiskManagementMixin(VSphereMixin):
# Reconfigure VM
task = vm.ReconfigVM_Task(spec=config_spec)
self.conn.wait_for_task(task)
conn.wait_for_task(task)
return {
"vm": vm_name,

View File

@ -53,6 +53,7 @@ class GuestOpsMixin(VSphereMixin):
working_directory: str = "",
wait_for_completion: bool = True,
timeout_seconds: int = 300,
host: str | None = None,
) -> dict[str, Any]:
"""Run a command in the guest OS.
@ -65,14 +66,16 @@ class GuestOpsMixin(VSphereMixin):
working_directory: Working directory for the command
wait_for_completion: Wait for command to complete
timeout_seconds: Timeout in seconds (only if waiting)
host: Managed ESXi host to target (default: the default host)
"""
vm = self.conn.find_vm(name)
conn = self._conn(host)
vm = conn.find_vm(name)
if not vm:
raise ValueError(f"VM '{name}' not found")
self._check_tools_running(vm)
guest_ops = self.conn.content.guestOperationsManager
guest_ops = conn.content.guestOperationsManager
process_manager = guest_ops.processManager
auth = self._get_guest_auth(username, password)
@ -118,16 +121,17 @@ class GuestOpsMixin(VSphereMixin):
annotations=ToolAnnotations(readOnlyHint=True),
)
def list_guest_processes(
self, name: str, username: str, password: str
self, name: str, username: str, password: str, host: str | None = None
) -> list[dict[str, Any]]:
"""List processes running in the guest OS."""
vm = self.conn.find_vm(name)
conn = self._conn(host)
vm = conn.find_vm(name)
if not vm:
raise ValueError(f"VM '{name}' not found")
self._check_tools_running(vm)
guest_ops = self.conn.content.guestOperationsManager
guest_ops = conn.content.guestOperationsManager
process_manager = guest_ops.processManager
auth = self._get_guest_auth(username, password)
@ -150,7 +154,8 @@ class GuestOpsMixin(VSphereMixin):
annotations=ToolAnnotations(readOnlyHint=True),
)
def read_guest_file(
self, name: str, username: str, password: str, guest_path: str
self, name: str, username: str, password: str, guest_path: str,
host: str | None = None
) -> dict[str, Any]:
"""Read a file from the guest OS.
@ -159,14 +164,16 @@ class GuestOpsMixin(VSphereMixin):
username: Guest OS username
password: Guest OS password
guest_path: Path to file in guest (e.g., /etc/hosts, C:\\Windows\\System32\\hosts)
host: Managed ESXi host to target (default: the default host)
"""
vm = self.conn.find_vm(name)
conn = self._conn(host)
vm = conn.find_vm(name)
if not vm:
raise ValueError(f"VM '{name}' not found")
self._check_tools_running(vm)
guest_ops = self.conn.content.guestOperationsManager
guest_ops = conn.content.guestOperationsManager
file_manager = guest_ops.fileManager
auth = self._get_guest_auth(username, password)
@ -227,6 +234,7 @@ class GuestOpsMixin(VSphereMixin):
guest_path: str,
content: str,
overwrite: bool = True,
host: str | None = None,
) -> str:
"""Write a file to the guest OS.
@ -237,14 +245,16 @@ class GuestOpsMixin(VSphereMixin):
guest_path: Destination path in guest
content: File content (text)
overwrite: Overwrite if exists
host: Managed ESXi host to target (default: the default host)
"""
vm = self.conn.find_vm(name)
conn = self._conn(host)
vm = conn.find_vm(name)
if not vm:
raise ValueError(f"VM '{name}' not found")
self._check_tools_running(vm)
guest_ops = self.conn.content.guestOperationsManager
guest_ops = conn.content.guestOperationsManager
file_manager = guest_ops.fileManager
auth = self._get_guest_auth(username, password)
@ -287,16 +297,18 @@ class GuestOpsMixin(VSphereMixin):
annotations=ToolAnnotations(readOnlyHint=True),
)
def list_guest_directory(
self, name: str, username: str, password: str, guest_path: str
self, name: str, username: str, password: str, guest_path: str,
host: str | None = None
) -> list[dict[str, Any]]:
"""List files in a guest directory."""
vm = self.conn.find_vm(name)
conn = self._conn(host)
vm = conn.find_vm(name)
if not vm:
raise ValueError(f"VM '{name}' not found")
self._check_tools_running(vm)
guest_ops = self.conn.content.guestOperationsManager
guest_ops = conn.content.guestOperationsManager
file_manager = guest_ops.fileManager
auth = self._get_guest_auth(username, password)
@ -331,15 +343,17 @@ class GuestOpsMixin(VSphereMixin):
password: str,
guest_path: str,
create_parents: bool = True,
host: str | None = None,
) -> str:
"""Create a directory in the guest OS."""
vm = self.conn.find_vm(name)
conn = self._conn(host)
vm = conn.find_vm(name)
if not vm:
raise ValueError(f"VM '{name}' not found")
self._check_tools_running(vm)
guest_ops = self.conn.content.guestOperationsManager
guest_ops = conn.content.guestOperationsManager
file_manager = guest_ops.fileManager
auth = self._get_guest_auth(username, password)
@ -355,16 +369,18 @@ class GuestOpsMixin(VSphereMixin):
annotations=ToolAnnotations(destructiveHint=True, idempotentHint=True),
)
def delete_guest_file(
self, name: str, username: str, password: str, guest_path: str
self, name: str, username: str, password: str, guest_path: str,
host: str | None = None
) -> str:
"""Delete a file or directory from the guest OS."""
vm = self.conn.find_vm(name)
conn = self._conn(host)
vm = conn.find_vm(name)
if not vm:
raise ValueError(f"VM '{name}' not found")
self._check_tools_running(vm)
guest_ops = self.conn.content.guestOperationsManager
guest_ops = conn.content.guestOperationsManager
file_manager = guest_ops.fileManager
auth = self._get_guest_auth(username, password)

View File

@ -15,9 +15,9 @@ if TYPE_CHECKING:
class HostManagementMixin(VSphereMixin):
"""ESXi host management tools."""
def _get_host(self) -> vim.HostSystem:
def _get_host(self, conn) -> vim.HostSystem:
"""Get the ESXi host system."""
for entity in self.conn.datacenter.hostFolder.childEntity:
for entity in conn.datacenter.hostFolder.childEntity:
if isinstance(entity, vim.ComputeResource):
if entity.host:
return entity.host[0]
@ -30,13 +30,17 @@ class HostManagementMixin(VSphereMixin):
description="Get detailed information about the ESXi host",
annotations=ToolAnnotations(readOnlyHint=True),
)
def get_host_info(self) -> dict[str, Any]:
def get_host_info(self, host: str | None = None) -> dict[str, Any]:
"""Get detailed ESXi host information.
Args:
host: Managed ESXi host to target (default: the default host)
Returns:
Dict with host details including hardware, software, and status
"""
host = self._get_host()
conn = self._conn(host)
host = self._get_host(conn)
summary = host.summary
hardware = summary.hardware
config = summary.config
@ -80,17 +84,20 @@ class HostManagementMixin(VSphereMixin):
self,
evacuate_vms: bool = True,
timeout_seconds: int = 300,
host: str | None = None,
) -> dict[str, Any]:
"""Put ESXi host into maintenance mode.
Args:
evacuate_vms: Evacuate/suspend VMs before entering (default True)
timeout_seconds: Timeout for the operation (default 300)
host: Managed ESXi host to target (default: the default host)
Returns:
Dict with operation result
"""
host = self._get_host()
conn = self._conn(host)
host = self._get_host(conn)
if host.runtime.inMaintenanceMode:
return {
@ -104,7 +111,7 @@ class HostManagementMixin(VSphereMixin):
timeout=timeout_seconds,
evacuatePoweredOffVms=evacuate_vms,
)
self.conn.wait_for_task(task)
conn.wait_for_task(task)
return {
"host": host.name,
@ -121,16 +128,19 @@ class HostManagementMixin(VSphereMixin):
def exit_maintenance_mode(
self,
timeout_seconds: int = 300,
host: str | None = None,
) -> dict[str, Any]:
"""Exit ESXi host from maintenance mode.
Args:
timeout_seconds: Timeout for the operation (default 300)
host: Managed ESXi host to target (default: the default host)
Returns:
Dict with operation result
"""
host = self._get_host()
conn = self._conn(host)
host = self._get_host(conn)
if not host.runtime.inMaintenanceMode:
return {
@ -140,7 +150,7 @@ class HostManagementMixin(VSphereMixin):
}
task = host.ExitMaintenanceMode_Task(timeout=timeout_seconds)
self.conn.wait_for_task(task)
conn.wait_for_task(task)
return {
"host": host.name,
@ -153,13 +163,17 @@ class HostManagementMixin(VSphereMixin):
description="List all services on the ESXi host",
annotations=ToolAnnotations(readOnlyHint=True),
)
def list_services(self) -> list[dict[str, Any]]:
def list_services(self, host: str | None = None) -> list[dict[str, Any]]:
"""List all services on the ESXi host.
Args:
host: Managed ESXi host to target (default: the default host)
Returns:
List of service details
"""
host = self._get_host()
conn = self._conn(host)
host = self._get_host(conn)
service_system = host.configManager.serviceSystem
services = []
@ -180,16 +194,18 @@ class HostManagementMixin(VSphereMixin):
description="Start a service on the ESXi host",
annotations=ToolAnnotations(destructiveHint=True),
)
def start_service(self, service_key: str) -> dict[str, Any]:
def start_service(self, service_key: str, host: str | None = None) -> dict[str, Any]:
"""Start a service on the ESXi host.
Args:
service_key: Service key (e.g., 'TSM-SSH', 'ntpd', 'sfcbd')
host: Managed ESXi host to target (default: the default host)
Returns:
Dict with operation result
"""
host = self._get_host()
conn = self._conn(host)
host = self._get_host(conn)
service_system = host.configManager.serviceSystem
# Verify service exists
@ -225,16 +241,18 @@ class HostManagementMixin(VSphereMixin):
description="Stop a service on the ESXi host",
annotations=ToolAnnotations(destructiveHint=True),
)
def stop_service(self, service_key: str) -> dict[str, Any]:
def stop_service(self, service_key: str, host: str | None = None) -> dict[str, Any]:
"""Stop a service on the ESXi host.
Args:
service_key: Service key (e.g., 'TSM-SSH', 'ntpd')
host: Managed ESXi host to target (default: the default host)
Returns:
Dict with operation result
"""
host = self._get_host()
conn = self._conn(host)
host = self._get_host(conn)
service_system = host.configManager.serviceSystem
# Verify service exists
@ -274,17 +292,20 @@ class HostManagementMixin(VSphereMixin):
self,
service_key: str,
policy: str,
host: str | None = None,
) -> dict[str, Any]:
"""Set the startup policy for a service.
Args:
service_key: Service key (e.g., 'TSM-SSH', 'ntpd')
policy: Startup policy - 'on' (auto), 'off' (manual), 'automatic'
host: Managed ESXi host to target (default: the default host)
Returns:
Dict with operation result
"""
host = self._get_host()
conn = self._conn(host)
host = self._get_host(conn)
service_system = host.configManager.serviceSystem
valid_policies = ["on", "off", "automatic"]
@ -318,13 +339,17 @@ class HostManagementMixin(VSphereMixin):
description="Get NTP configuration for the ESXi host",
annotations=ToolAnnotations(readOnlyHint=True),
)
def get_ntp_config(self) -> dict[str, Any]:
def get_ntp_config(self, host: str | None = None) -> dict[str, Any]:
"""Get NTP configuration for the ESXi host.
Args:
host: Managed ESXi host to target (default: the default host)
Returns:
Dict with NTP configuration
"""
host = self._get_host()
conn = self._conn(host)
host = self._get_host(conn)
datetime_system = host.configManager.dateTimeSystem
ntp_config = datetime_system.dateTimeInfo.ntpConfig
@ -357,17 +382,20 @@ class HostManagementMixin(VSphereMixin):
self,
ntp_servers: list[str],
start_service: bool = True,
host: str | None = None,
) -> dict[str, Any]:
"""Configure NTP servers for the ESXi host.
Args:
ntp_servers: List of NTP server addresses
start_service: Start ntpd service after configuring (default True)
host: Managed ESXi host to target (default: the default host)
Returns:
Dict with configuration result
"""
host = self._get_host()
conn = self._conn(host)
host = self._get_host(conn)
datetime_system = host.configManager.dateTimeSystem
# Create NTP config
@ -406,16 +434,18 @@ class HostManagementMixin(VSphereMixin):
description="Reboot the ESXi host (requires maintenance mode)",
annotations=ToolAnnotations(destructiveHint=True),
)
def reboot_host(self, force: bool = False) -> dict[str, Any]:
def reboot_host(self, force: bool = False, host: str | None = None) -> dict[str, Any]:
"""Reboot the ESXi host.
Args:
force: Force reboot even if VMs are running (dangerous!)
host: Managed ESXi host to target (default: the default host)
Returns:
Dict with operation result
"""
host = self._get_host()
conn = self._conn(host)
host = self._get_host(conn)
if not host.runtime.inMaintenanceMode and not force:
raise ValueError(
@ -438,16 +468,18 @@ class HostManagementMixin(VSphereMixin):
description="Shutdown the ESXi host (requires maintenance mode)",
annotations=ToolAnnotations(destructiveHint=True),
)
def shutdown_host(self, force: bool = False) -> dict[str, Any]:
def shutdown_host(self, force: bool = False, host: str | None = None) -> dict[str, Any]:
"""Shutdown the ESXi host.
Args:
force: Force shutdown even if VMs are running (dangerous!)
host: Managed ESXi host to target (default: the default host)
Returns:
Dict with operation result
"""
host = self._get_host()
conn = self._conn(host)
host = self._get_host(conn)
if not host.runtime.inMaintenanceMode and not force:
raise ValueError(
@ -470,13 +502,17 @@ class HostManagementMixin(VSphereMixin):
description="Get detailed hardware information for the ESXi host",
annotations=ToolAnnotations(readOnlyHint=True),
)
def get_host_hardware(self) -> dict[str, Any]:
def get_host_hardware(self, host: str | None = None) -> dict[str, Any]:
"""Get detailed hardware information.
Args:
host: Managed ESXi host to target (default: the default host)
Returns:
Dict with hardware details
"""
host = self._get_host()
conn = self._conn(host)
host = self._get_host(conn)
hardware = host.hardware
# CPU info
@ -532,13 +568,17 @@ class HostManagementMixin(VSphereMixin):
description="Get network configuration for the ESXi host",
annotations=ToolAnnotations(readOnlyHint=True),
)
def get_host_networking(self) -> dict[str, Any]:
def get_host_networking(self, host: str | None = None) -> dict[str, Any]:
"""Get network configuration for the ESXi host.
Args:
host: Managed ESXi host to target (default: the default host)
Returns:
Dict with networking details
"""
host = self._get_host()
conn = self._conn(host)
host = self._get_host(conn)
network_config = host.config.network
# Virtual switches

View File

@ -21,9 +21,14 @@ class MonitoringMixin(VSphereMixin):
description="Get current performance statistics for a virtual machine",
annotations=ToolAnnotations(readOnlyHint=True),
)
def get_vm_stats(self, name: str) -> dict[str, Any]:
"""Get VM performance statistics."""
vm = self.conn.find_vm(name)
def get_vm_stats(self, name: str, host: str | None = None) -> dict[str, Any]:
"""Get VM performance statistics.
Args:
host: Managed ESXi host to target (default: the default host)
"""
conn = self._conn(host)
vm = conn.find_vm(name)
if not vm:
raise ValueError(f"VM '{name}' not found")
@ -58,19 +63,25 @@ class MonitoringMixin(VSphereMixin):
description="Get performance statistics for an ESXi host",
annotations=ToolAnnotations(readOnlyHint=True),
)
def get_host_stats(self, host_name: str | None = None) -> dict[str, Any]:
def get_host_stats(
self, host_name: str | None = None, host: str | None = None
) -> dict[str, Any]:
"""Get ESXi host performance statistics.
If host_name is not provided, returns stats for the first host.
Args:
host: Managed ESXi host to target (default: the default host)
"""
conn = self._conn(host)
if host_name:
host = self.conn.find_host(host_name)
host = conn.find_host(host_name)
if not host:
raise ValueError(f"Host '{host_name}' not found")
else:
# Get first host
container = self.conn.content.viewManager.CreateContainerView(
self.conn.content.rootFolder, [vim.HostSystem], True
container = conn.content.viewManager.CreateContainerView(
conn.content.rootFolder, [vim.HostSystem], True
)
try:
hosts = list(container.view)
@ -117,10 +128,15 @@ class MonitoringMixin(VSphereMixin):
description="List all ESXi hosts in the datacenter",
annotations=ToolAnnotations(readOnlyHint=True),
)
def list_hosts(self) -> list[dict[str, Any]]:
"""List all ESXi hosts with basic info."""
container = self.conn.content.viewManager.CreateContainerView(
self.conn.content.rootFolder, [vim.HostSystem], True
def list_hosts(self, host: str | None = None) -> list[dict[str, Any]]:
"""List all ESXi hosts with basic info.
Args:
host: Managed ESXi host to target (default: the default host)
"""
conn = self._conn(host)
container = conn.content.viewManager.CreateContainerView(
conn.content.rootFolder, [vim.HostSystem], True
)
try:
hosts = []
@ -147,9 +163,16 @@ class MonitoringMixin(VSphereMixin):
description="Get recent vSphere tasks (VM operations, etc.)",
annotations=ToolAnnotations(readOnlyHint=True),
)
def get_recent_tasks(self, count: int = 20) -> list[dict[str, Any]]:
"""Get recent vSphere tasks."""
task_manager = self.conn.content.taskManager
def get_recent_tasks(
self, count: int = 20, host: str | None = None
) -> list[dict[str, Any]]:
"""Get recent vSphere tasks.
Args:
host: Managed ESXi host to target (default: the default host)
"""
conn = self._conn(host)
task_manager = conn.content.taskManager
recent_tasks = task_manager.recentTask[:count] if task_manager.recentTask else []
tasks = []
@ -187,10 +210,15 @@ class MonitoringMixin(VSphereMixin):
annotations=ToolAnnotations(readOnlyHint=True),
)
def get_recent_events(
self, count: int = 50, hours: int = 24
self, count: int = 50, hours: int = 24, host: str | None = None
) -> list[dict[str, Any]]:
"""Get recent vSphere events."""
event_manager = self.conn.content.eventManager
"""Get recent vSphere events.
Args:
host: Managed ESXi host to target (default: the default host)
"""
conn = self._conn(host)
event_manager = conn.content.eventManager
# Create time filter
time_filter = vim.event.EventFilterSpec.ByTime()
@ -234,17 +262,22 @@ class MonitoringMixin(VSphereMixin):
description="Get triggered alarms in the datacenter",
annotations=ToolAnnotations(readOnlyHint=True),
)
def get_alarms(self) -> list[dict[str, Any]]:
"""Get all triggered alarms."""
def get_alarms(self, host: str | None = None) -> list[dict[str, Any]]:
"""Get all triggered alarms.
Args:
host: Managed ESXi host to target (default: the default host)
"""
conn = self._conn(host)
alarms = []
# Check datacenter alarms
if self.conn.datacenter.triggeredAlarmState:
for alarm_state in self.conn.datacenter.triggeredAlarmState:
if conn.datacenter.triggeredAlarmState:
for alarm_state in conn.datacenter.triggeredAlarmState:
alarms.append(self._format_alarm(alarm_state))
# Check VM alarms
for vm in self.conn.get_all_vms():
for vm in conn.get_all_vms():
if vm.triggeredAlarmState:
for alarm_state in vm.triggeredAlarmState:
alarms.append(self._format_alarm(alarm_state, vm.name))

View File

@ -28,10 +28,10 @@ class NICManagementMixin(VSphereMixin):
return None
def _get_network_backing(
self, network_name: str
self, conn, network_name: str
) -> vim.vm.device.VirtualEthernetCard.NetworkBackingInfo:
"""Get the appropriate backing info for a network."""
network = self.conn.find_network(network_name)
network = conn.find_network(network_name)
if not network:
raise ValueError(f"Network '{network_name}' not found")
@ -54,16 +54,18 @@ class NICManagementMixin(VSphereMixin):
description="List all network adapters attached to a VM",
annotations=ToolAnnotations(readOnlyHint=True),
)
def list_nics(self, vm_name: str) -> list[dict[str, Any]]:
def list_nics(self, vm_name: str, host: str | None = None) -> list[dict[str, Any]]:
"""List all virtual network adapters on a VM.
Args:
vm_name: Name of the virtual machine
host: Managed ESXi host to target (default: the default host)
Returns:
List of NIC details
"""
vm = self.conn.find_vm(vm_name)
conn = self._conn(host)
vm = conn.find_vm(vm_name)
if not vm:
raise ValueError(f"VM '{vm_name}' not found")
@ -87,7 +89,7 @@ class NICManagementMixin(VSphereMixin):
# For distributed switch, look up the portgroup name
nic_info["network"] = f"DVS:{backing.port.portgroupKey}"
# Try to get actual name
for net in self.conn.datacenter.networkFolder.childEntity:
for net in conn.datacenter.networkFolder.childEntity:
if hasattr(net, "key") and net.key == backing.port.portgroupKey:
nic_info["network"] = net.name
break
@ -107,6 +109,7 @@ class NICManagementMixin(VSphereMixin):
network: str,
nic_type: str = "vmxnet3",
start_connected: bool = True,
host: str | None = None,
) -> dict[str, Any]:
"""Add a new network adapter to a VM.
@ -115,11 +118,13 @@ class NICManagementMixin(VSphereMixin):
network: Network/portgroup name to connect to
nic_type: Adapter type - vmxnet3 (default), e1000, e1000e
start_connected: Connect adapter when VM powers on (default True)
host: Managed ESXi host to target (default: the default host)
Returns:
Dict with new NIC details
"""
vm = self.conn.find_vm(vm_name)
conn = self._conn(host)
vm = conn.find_vm(vm_name)
if not vm:
raise ValueError(f"VM '{vm_name}' not found")
@ -137,7 +142,7 @@ class NICManagementMixin(VSphereMixin):
# Create the NIC
nic = nic_class()
nic.backing = self._get_network_backing(network)
nic.backing = self._get_network_backing(conn, network)
nic.connectable = vim.vm.device.VirtualDevice.ConnectInfo()
nic.connectable.startConnected = start_connected
nic.connectable.connected = False # Can't connect until powered on
@ -155,7 +160,7 @@ class NICManagementMixin(VSphereMixin):
# Reconfigure VM
task = vm.ReconfigVM_Task(spec=config_spec)
self.conn.wait_for_task(task)
conn.wait_for_task(task)
# Get the MAC address that was assigned
vm.Reload()
@ -190,17 +195,20 @@ class NICManagementMixin(VSphereMixin):
self,
vm_name: str,
nic_label: str,
host: str | None = None,
) -> dict[str, Any]:
"""Remove a network adapter from a VM.
Args:
vm_name: Name of the virtual machine
nic_label: Label of NIC to remove (e.g., 'Network adapter 1')
host: Managed ESXi host to target (default: the default host)
Returns:
Dict with removal details
"""
vm = self.conn.find_vm(vm_name)
conn = self._conn(host)
vm = conn.find_vm(vm_name)
if not vm:
raise ValueError(f"VM '{vm_name}' not found")
@ -230,7 +238,7 @@ class NICManagementMixin(VSphereMixin):
# Reconfigure VM
task = vm.ReconfigVM_Task(spec=config_spec)
self.conn.wait_for_task(task)
conn.wait_for_task(task)
return {
"vm": vm_name,
@ -250,6 +258,7 @@ class NICManagementMixin(VSphereMixin):
vm_name: str,
nic_label: str,
new_network: str,
host: str | None = None,
) -> dict[str, Any]:
"""Change which network a NIC is connected to.
@ -257,11 +266,13 @@ class NICManagementMixin(VSphereMixin):
vm_name: Name of the virtual machine
nic_label: Label of NIC to modify (e.g., 'Network adapter 1')
new_network: New network/portgroup name
host: Managed ESXi host to target (default: the default host)
Returns:
Dict with change details
"""
vm = self.conn.find_vm(vm_name)
conn = self._conn(host)
vm = conn.find_vm(vm_name)
if not vm:
raise ValueError(f"VM '{vm_name}' not found")
@ -280,7 +291,7 @@ class NICManagementMixin(VSphereMixin):
old_network = nic.backing.deviceName
# Update backing to new network
nic.backing = self._get_network_backing(new_network)
nic.backing = self._get_network_backing(conn, new_network)
# Create device edit spec
nic_spec = vim.vm.device.VirtualDeviceSpec()
@ -293,7 +304,7 @@ class NICManagementMixin(VSphereMixin):
# Reconfigure VM
task = vm.ReconfigVM_Task(spec=config_spec)
self.conn.wait_for_task(task)
conn.wait_for_task(task)
return {
"vm": vm_name,
@ -314,6 +325,7 @@ class NICManagementMixin(VSphereMixin):
vm_name: str,
nic_label: str,
connected: bool = True,
host: str | None = None,
) -> dict[str, Any]:
"""Connect or disconnect a NIC on a running VM.
@ -321,11 +333,13 @@ class NICManagementMixin(VSphereMixin):
vm_name: Name of the virtual machine
nic_label: Label of NIC (e.g., 'Network adapter 1')
connected: True to connect, False to disconnect
host: Managed ESXi host to target (default: the default host)
Returns:
Dict with connection status
"""
vm = self.conn.find_vm(vm_name)
conn = self._conn(host)
vm = conn.find_vm(vm_name)
if not vm:
raise ValueError(f"VM '{vm_name}' not found")
@ -355,7 +369,7 @@ class NICManagementMixin(VSphereMixin):
# Reconfigure VM
task = vm.ReconfigVM_Task(spec=config_spec)
self.conn.wait_for_task(task)
conn.wait_for_task(task)
return {
"vm": vm_name,
@ -374,6 +388,7 @@ class NICManagementMixin(VSphereMixin):
vm_name: str,
nic_label: str,
mac_address: str,
host: str | None = None,
) -> dict[str, Any]:
"""Set a custom MAC address for a NIC.
@ -381,11 +396,13 @@ class NICManagementMixin(VSphereMixin):
vm_name: Name of the virtual machine
nic_label: Label of NIC (e.g., 'Network adapter 1')
mac_address: MAC address in format XX:XX:XX:XX:XX:XX
host: Managed ESXi host to target (default: the default host)
Returns:
Dict with MAC address change details
"""
vm = self.conn.find_vm(vm_name)
conn = self._conn(host)
vm = conn.find_vm(vm_name)
if not vm:
raise ValueError(f"VM '{vm_name}' not found")
@ -420,7 +437,7 @@ class NICManagementMixin(VSphereMixin):
# Reconfigure VM
task = vm.ReconfigVM_Task(spec=config_spec)
self.conn.wait_for_task(task)
conn.wait_for_task(task)
return {
"vm": vm_name,

View File

@ -56,11 +56,12 @@ class OVFManagementMixin(VSphereMixin):
_lease: vim.HttpNfcLease,
disk_path: str,
device_url: str,
conn,
) -> None:
"""Upload a disk file via NFC lease."""
# Create SSL context
context = ssl.create_default_context()
if self.conn.settings.vcenter_insecure:
if conn.settings.vcenter_insecure:
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE
@ -74,8 +75,8 @@ class OVFManagementMixin(VSphereMixin):
request.add_header("Connection", "Keep-Alive")
# Add session cookie
if hasattr(self.conn.service_instance, "_stub"):
cookie = self.conn.service_instance._stub.cookie
if hasattr(conn.service_instance, "_stub"):
cookie = conn.service_instance._stub.cookie
if cookie:
request.add_header("Cookie", cookie)
@ -96,6 +97,7 @@ class OVFManagementMixin(VSphereMixin):
datastore: str,
network: str | None = None,
power_on: bool = False,
host: str | None = None,
) -> dict[str, Any]:
"""Deploy a virtual machine from an OVF or OVA file.
@ -107,21 +109,23 @@ class OVFManagementMixin(VSphereMixin):
datastore: Target datastore for VM files
network: Network to connect VM to (optional)
power_on: Power on VM after deployment (default False)
host: Managed ESXi host to target (default: the default host)
Returns:
Dict with deployment details
"""
conn = self._conn(host)
# Get OVF Manager
ovf_manager = self.conn.content.ovfManager
ovf_manager = conn.content.ovfManager
# Find target datastore
ds = self.conn.find_datastore(datastore)
ds = conn.find_datastore(datastore)
if not ds:
raise ValueError(f"Datastore '{datastore}' not found")
# Get resource pool and folder
host = None
for h in self.conn.datacenter.hostFolder.childEntity:
for h in conn.datacenter.hostFolder.childEntity:
if hasattr(h, "host"):
host = h.host[0] if h.host else None
break
@ -139,7 +143,7 @@ class OVFManagementMixin(VSphereMixin):
resource_pool = host.parent.resourcePool
# Get VM folder
vm_folder = self.conn.datacenter.vmFolder
vm_folder = conn.datacenter.vmFolder
# Read OVF descriptor from datastore
# For OVA, we need to extract first
@ -154,7 +158,7 @@ class OVFManagementMixin(VSphereMixin):
)
# Read OVF descriptor via datastore browser
ovf_descriptor = self._read_datastore_file(datastore, ovf_path)
ovf_descriptor = self._read_datastore_file(datastore, ovf_path, conn)
# Create import spec params
import_spec_params = vim.OvfManager.CreateImportSpecParams(
@ -164,7 +168,7 @@ class OVFManagementMixin(VSphereMixin):
# If network specified, add network mapping
if network:
net = self.conn.find_network(network)
net = conn.find_network(network)
if net:
network_mapping = vim.OvfManager.NetworkMapping(
name="VM Network", # Common default in OVF
@ -213,7 +217,7 @@ class OVFManagementMixin(VSphereMixin):
lease.Complete()
# Find the newly created VM
vm = self.conn.find_vm(vm_name)
vm = conn.find_vm(vm_name)
result = {
"vm": vm_name,
@ -226,35 +230,35 @@ class OVFManagementMixin(VSphereMixin):
result["uuid"] = vm.config.uuid
if power_on:
task = vm.PowerOnVM_Task()
self.conn.wait_for_task(task)
conn.wait_for_task(task)
result["power_state"] = "poweredOn"
else:
result["power_state"] = "poweredOff"
return result
def _read_datastore_file(self, datastore: str, path: str) -> str:
def _read_datastore_file(self, datastore: str, path: str, conn) -> str:
"""Read a text file from datastore."""
ds = self.conn.find_datastore(datastore)
ds = conn.find_datastore(datastore)
if not ds:
raise ValueError(f"Datastore '{datastore}' not found")
# Build HTTP URL
dc_name = self.conn.datacenter.name
dc_name = conn.datacenter.name
url = (
f"https://{self.conn.settings.vcenter_host}/folder/{path}"
f"https://{conn.settings.vcenter_host}/folder/{path}"
f"?dcPath={dc_name}&dsName={datastore}"
)
# Setup request
context = ssl.create_default_context()
if self.conn.settings.vcenter_insecure:
if conn.settings.vcenter_insecure:
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE
request = urllib.request.Request(url)
if hasattr(self.conn.service_instance, "_stub"):
cookie = self.conn.service_instance._stub.cookie
if hasattr(conn.service_instance, "_stub"):
cookie = conn.service_instance._stub.cookie
if cookie:
request.add_header("Cookie", cookie)
@ -269,11 +273,11 @@ class OVFManagementMixin(VSphereMixin):
# For now, document this limitation
pass
def _resolve_ova_file(self, ova_path: str) -> tuple[str, bool]:
def _resolve_ova_file(self, ova_path: str, conn) -> tuple[str, bool]:
"""Return (local_path, is_temp); downloads the OVA first if a URL is given."""
if ova_path.startswith(("http://", "https://")):
context = ssl.create_default_context()
if self.conn.settings.vcenter_insecure:
if conn.settings.vcenter_insecure:
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE
fd, tmp = tempfile.mkstemp(suffix=".ova", prefix="ova_dl_")
@ -301,7 +305,9 @@ class OVFManagementMixin(VSphereMixin):
names.append(val)
return names
def _put_stream_to_lease(self, device_url: str, fileobj: Any, size: int) -> None:
def _put_stream_to_lease(
self, device_url: str, fileobj: Any, size: int, conn
) -> None:
"""Stream a file object to an NFC lease device URL via chunked HTTP PUT."""
parsed = urlparse(device_url)
host = parsed.hostname
@ -311,43 +317,47 @@ class OVFManagementMixin(VSphereMixin):
if (
not host
or host == "*"
or self.conn.content.about.apiType == "HostAgent"
or conn.content.about.apiType == "HostAgent"
):
host = self.conn.settings.vcenter_host
host = conn.settings.vcenter_host
port = parsed.port or 443
path = parsed.path + (f"?{parsed.query}" if parsed.query else "")
context = ssl.create_default_context()
if self.conn.settings.vcenter_insecure:
if conn.settings.vcenter_insecure:
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE
conn = http.client.HTTPSConnection(host, port, context=context, timeout=900)
http_conn = http.client.HTTPSConnection(
host, port, context=context, timeout=900
)
try:
conn.putrequest("PUT", path, skip_host=True, skip_accept_encoding=True)
conn.putheader("Host", host)
conn.putheader("Content-Length", str(size))
conn.putheader("Content-Type", "application/x-vnd.vmware-streamVmdk")
http_conn.putrequest(
"PUT", path, skip_host=True, skip_accept_encoding=True
)
http_conn.putheader("Host", host)
http_conn.putheader("Content-Length", str(size))
http_conn.putheader("Content-Type", "application/x-vnd.vmware-streamVmdk")
# ImportVApp pre-creates the disk file, so the NFC stream must
# explicitly overwrite it (else ESXi returns 403 "File exists").
conn.putheader("Overwrite", "t")
cookie = getattr(self.conn.si._stub, "cookie", None)
http_conn.putheader("Overwrite", "t")
cookie = getattr(conn.si._stub, "cookie", None)
if cookie:
conn.putheader("Cookie", cookie)
conn.endheaders()
http_conn.putheader("Cookie", cookie)
http_conn.endheaders()
while True:
chunk = fileobj.read(1024 * 1024)
if not chunk:
break
conn.send(chunk)
resp = conn.getresponse()
http_conn.send(chunk)
resp = http_conn.getresponse()
resp.read()
if resp.status not in (200, 201):
raise ValueError(
f"Disk upload failed: HTTP {resp.status} {resp.reason}"
)
finally:
conn.close()
http_conn.close()
@mcp_tool(
name="deploy_ova",
@ -366,6 +376,7 @@ class OVFManagementMixin(VSphereMixin):
iso_path: str | None = None,
iso_datastore: str | None = None,
boot_from_iso: bool = True,
host: str | None = None,
) -> dict[str, Any]:
"""Deploy a virtual machine from an OVA file.
@ -389,25 +400,27 @@ class OVFManagementMixin(VSphereMixin):
a bootable ISO). A CD/DVD drive is added automatically.
iso_datastore: Datastore holding the ISO (default: the VM's datastore)
boot_from_iso: When an ISO is given, put the CD/DVD first in boot order
host: Managed ESXi host to target (default: the default host)
Returns:
Dict with deployment details
"""
ds = self.conn.find_datastore(datastore) if datastore else self.conn.datastore
conn = self._conn(host)
ds = conn.find_datastore(datastore) if datastore else conn.datastore
if not ds:
raise ValueError(f"Datastore '{datastore}' not found")
resource_pool = self.conn.resource_pool
vm_folder = self.conn.datacenter.vmFolder
resource_pool = conn.resource_pool
vm_folder = conn.datacenter.vmFolder
# An ESXi host is required as the import target
view = self.conn.content.viewManager.CreateContainerView(
self.conn.content.rootFolder, [vim.HostSystem], True
view = conn.content.viewManager.CreateContainerView(
conn.content.rootFolder, [vim.HostSystem], True
)
hosts = list(view.view)
view.Destroy()
host_system = hosts[0] if hosts else None
local_ova, is_temp = self._resolve_ova_file(ova_path)
local_ova, is_temp = self._resolve_ova_file(ova_path, conn)
import_spec = None
try:
with tarfile.open(local_ova) as tar:
@ -427,7 +440,7 @@ class OVFManagementMixin(VSphereMixin):
if deployment_option:
spec_params.deploymentOption = deployment_option
if network:
net = self.conn.find_network(network)
net = conn.find_network(network)
if not net:
raise ValueError(f"Network '{network}' not found")
ovf_nets = self._parse_ovf_network_names(ovf_xml) or ["VM Network"]
@ -436,7 +449,7 @@ class OVFManagementMixin(VSphereMixin):
for n in ovf_nets
]
ovf_manager = self.conn.content.ovfManager
ovf_manager = conn.content.ovfManager
import_spec = ovf_manager.CreateImportSpec(
ovfDescriptor=ovf_xml,
resourcePool=resource_pool,
@ -485,7 +498,9 @@ class OVFManagementMixin(VSphereMixin):
if not member:
raise ValueError(f"Disk '{fi.path}' missing from OVA")
src = tar.extractfile(member)
self._put_stream_to_lease(device_url, src, member.size)
self._put_stream_to_lease(
device_url, src, member.size, conn
)
uploaded[0] += member.size
lease.HttpNfcLeaseProgress(
min(99, int(uploaded[0] * 100 / total))
@ -502,7 +517,7 @@ class OVFManagementMixin(VSphereMixin):
if is_temp:
Path(local_ova).unlink(missing_ok=True)
vm = self.conn.find_vm(vm_name)
vm = conn.find_vm(vm_name)
result: dict[str, Any] = {
"vm": vm_name,
"action": "ova_deployed",
@ -514,11 +529,11 @@ class OVFManagementMixin(VSphereMixin):
result["uuid"] = vm.config.uuid
if iso_path:
result["iso"] = self._add_cdrom_with_iso(
vm, iso_path, iso_datastore, boot_from_iso
vm, iso_path, iso_datastore, boot_from_iso, conn
)
if power_on:
task = vm.PowerOnVM_Task()
self.conn.wait_for_task(task)
conn.wait_for_task(task)
result["power_state"] = "poweredOn"
else:
result["power_state"] = "poweredOff"
@ -530,6 +545,7 @@ class OVFManagementMixin(VSphereMixin):
iso_path: str,
iso_datastore: str | None,
boot_from_iso: bool,
conn,
) -> str:
"""Mount an ISO in the VM's CD/DVD drive (reusing the first existing
drive, or adding one if none exist) and optionally boot from it.
@ -601,7 +617,7 @@ class OVFManagementMixin(VSphereMixin):
bootOrder=[vim.vm.BootOptions.BootableCdromDevice()]
)
task = vm.ReconfigVM_Task(spec=config_spec)
self.conn.wait_for_task(task)
conn.wait_for_task(task)
return mounted
@mcp_tool(
@ -609,7 +625,7 @@ class OVFManagementMixin(VSphereMixin):
description="Inspect an OVA (local path or URL): product, deployment configurations, networks, and disks",
annotations=ToolAnnotations(readOnlyHint=True),
)
def inspect_ova(self, ova_path: str) -> dict[str, Any]:
def inspect_ova(self, ova_path: str, host: str | None = None) -> dict[str, Any]:
"""Inspect an OVA without deploying it.
Reads the OVF descriptor and reports the product, guest OS, hardware
@ -619,11 +635,13 @@ class OVFManagementMixin(VSphereMixin):
Args:
ova_path: Local path to the .ova file, or an http(s) URL
host: Managed ESXi host to target (default: the default host)
Returns:
Dict describing the OVA
"""
local_ova, is_temp = self._resolve_ova_file(ova_path)
conn = self._conn(host)
local_ova, is_temp = self._resolve_ova_file(ova_path, conn)
try:
with tarfile.open(local_ova) as tar:
ovf_member = next(
@ -711,6 +729,7 @@ class OVFManagementMixin(VSphereMixin):
vm_name: str,
target_path: str,
datastore: str | None = None,
host: str | None = None,
) -> dict[str, Any]:
"""Export a virtual machine to OVF format.
@ -718,11 +737,13 @@ class OVFManagementMixin(VSphereMixin):
vm_name: Name of the VM to export
target_path: Target directory path on datastore
datastore: Target datastore (default: VM's datastore)
host: Managed ESXi host to target (default: the default host)
Returns:
Dict with export details
"""
vm = self.conn.find_vm(vm_name)
conn = self._conn(host)
vm = conn.find_vm(vm_name)
if not vm:
raise ValueError(f"VM '{vm_name}' not found")
@ -732,7 +753,7 @@ class OVFManagementMixin(VSphereMixin):
# Determine target datastore
if datastore:
ds = self.conn.find_datastore(datastore)
ds = conn.find_datastore(datastore)
if not ds:
raise ValueError(f"Datastore '{datastore}' not found")
ds_name = datastore
@ -750,7 +771,7 @@ class OVFManagementMixin(VSphereMixin):
raise ValueError(f"Export lease error: {lease.error}")
# Get OVF descriptor
ovf_manager = self.conn.content.ovfManager
ovf_manager = conn.content.ovfManager
ovf_descriptor = ovf_manager.CreateDescriptor(
obj=vm,
cdp=vim.OvfManager.CreateDescriptorParams(
@ -784,7 +805,9 @@ class OVFManagementMixin(VSphereMixin):
ovf_output_path = f"{target_path}/{ovf_filename}"
# Upload OVF descriptor to datastore
self._write_datastore_file(ds_name, ovf_output_path, ovf_descriptor.ovfDescriptor)
self._write_datastore_file(
ds_name, ovf_output_path, ovf_descriptor.ovfDescriptor, conn
)
exported_files.append(ovf_output_path)
@ -800,17 +823,19 @@ class OVFManagementMixin(VSphereMixin):
"ovf_descriptor": ovf_filename,
}
def _write_datastore_file(self, datastore: str, path: str, content: str) -> None:
def _write_datastore_file(
self, datastore: str, path: str, content: str, conn
) -> None:
"""Write a text file to datastore."""
dc_name = self.conn.datacenter.name
dc_name = conn.datacenter.name
url = (
f"https://{self.conn.settings.vcenter_host}/folder/{path}"
f"https://{conn.settings.vcenter_host}/folder/{path}"
f"?dcPath={dc_name}&dsName={datastore}"
)
# Setup request
context = ssl.create_default_context()
if self.conn.settings.vcenter_insecure:
if conn.settings.vcenter_insecure:
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE
@ -819,8 +844,8 @@ class OVFManagementMixin(VSphereMixin):
request.add_header("Content-Type", "application/xml")
request.add_header("Content-Length", str(len(data)))
if hasattr(self.conn.service_instance, "_stub"):
cookie = self.conn.service_instance._stub.cookie
if hasattr(conn.service_instance, "_stub"):
cookie = conn.service_instance._stub.cookie
if cookie:
request.add_header("Cookie", cookie)
@ -835,25 +860,28 @@ class OVFManagementMixin(VSphereMixin):
self,
ovf_path: str,
datastore: str,
host: str | None = None,
) -> list[dict[str, str]]:
"""List networks defined in an OVF descriptor.
Args:
ovf_path: Path to OVF file on datastore
datastore: Datastore containing the OVF
host: Managed ESXi host to target (default: the default host)
Returns:
List of network definitions
"""
conn = self._conn(host)
# Read OVF descriptor
ovf_descriptor = self._read_datastore_file(datastore, ovf_path)
ovf_descriptor = self._read_datastore_file(datastore, ovf_path, conn)
# Parse network references
ovf_manager = self.conn.content.ovfManager
ovf_manager = conn.content.ovfManager
# Get resource pool for parsing
host = None
for h in self.conn.datacenter.hostFolder.childEntity:
for h in conn.datacenter.hostFolder.childEntity:
if hasattr(h, "host"):
host = h.host[0] if h.host else None
break
@ -862,7 +890,7 @@ class OVFManagementMixin(VSphereMixin):
raise ValueError("No ESXi host found")
resource_pool = host.parent.resourcePool if hasattr(host, "parent") else None
ds = self.conn.find_datastore(datastore)
ds = conn.find_datastore(datastore)
# Create parse params to extract network info
import_spec_params = vim.OvfManager.CreateImportSpecParams()

View File

@ -20,9 +20,10 @@ class PowerOpsMixin(VSphereMixin):
description="Power on a virtual machine",
annotations=ToolAnnotations(destructiveHint=False, idempotentHint=True),
)
def power_on(self, name: str) -> str:
def power_on(self, name: str, host: str | None = None) -> str:
"""Power on a virtual machine."""
vm = self.conn.find_vm(name)
conn = self._conn(host)
vm = conn.find_vm(name)
if not vm:
raise ValueError(f"VM '{name}' not found")
@ -30,7 +31,7 @@ class PowerOpsMixin(VSphereMixin):
return f"VM '{name}' is already powered on"
task = vm.PowerOnVM_Task()
self.conn.wait_for_task(task)
conn.wait_for_task(task)
return f"VM '{name}' powered on"
@ -39,9 +40,10 @@ class PowerOpsMixin(VSphereMixin):
description="Power off a virtual machine (hard shutdown, like pulling the power cord)",
annotations=ToolAnnotations(destructiveHint=True, idempotentHint=True),
)
def power_off(self, name: str) -> str:
def power_off(self, name: str, host: str | None = None) -> str:
"""Power off a virtual machine (hard shutdown)."""
vm = self.conn.find_vm(name)
conn = self._conn(host)
vm = conn.find_vm(name)
if not vm:
raise ValueError(f"VM '{name}' not found")
@ -49,7 +51,7 @@ class PowerOpsMixin(VSphereMixin):
return f"VM '{name}' is already powered off"
task = vm.PowerOffVM_Task()
self.conn.wait_for_task(task)
conn.wait_for_task(task)
return f"VM '{name}' powered off"
@ -58,9 +60,10 @@ class PowerOpsMixin(VSphereMixin):
description="Gracefully shutdown the guest OS (requires VMware Tools installed and running)",
annotations=ToolAnnotations(destructiveHint=True, idempotentHint=True),
)
def shutdown_guest(self, name: str) -> str:
def shutdown_guest(self, name: str, host: str | None = None) -> str:
"""Gracefully shutdown the guest OS."""
vm = self.conn.find_vm(name)
conn = self._conn(host)
vm = conn.find_vm(name)
if not vm:
raise ValueError(f"VM '{name}' not found")
@ -81,9 +84,10 @@ class PowerOpsMixin(VSphereMixin):
description="Gracefully reboot the guest OS (requires VMware Tools)",
annotations=ToolAnnotations(destructiveHint=True, idempotentHint=False),
)
def reboot_guest(self, name: str) -> str:
def reboot_guest(self, name: str, host: str | None = None) -> str:
"""Gracefully reboot the guest OS."""
vm = self.conn.find_vm(name)
conn = self._conn(host)
vm = conn.find_vm(name)
if not vm:
raise ValueError(f"VM '{name}' not found")
@ -104,14 +108,15 @@ class PowerOpsMixin(VSphereMixin):
description="Reset (hard reboot) a virtual machine - like pressing the reset button",
annotations=ToolAnnotations(destructiveHint=True, idempotentHint=False),
)
def reset_vm(self, name: str) -> str:
def reset_vm(self, name: str, host: str | None = None) -> str:
"""Reset (hard reboot) a virtual machine."""
vm = self.conn.find_vm(name)
conn = self._conn(host)
vm = conn.find_vm(name)
if not vm:
raise ValueError(f"VM '{name}' not found")
task = vm.ResetVM_Task()
self.conn.wait_for_task(task)
conn.wait_for_task(task)
return f"VM '{name}' reset"
@ -120,9 +125,10 @@ class PowerOpsMixin(VSphereMixin):
description="Suspend a virtual machine (save state to disk)",
annotations=ToolAnnotations(destructiveHint=False, idempotentHint=True),
)
def suspend_vm(self, name: str) -> str:
def suspend_vm(self, name: str, host: str | None = None) -> str:
"""Suspend a virtual machine."""
vm = self.conn.find_vm(name)
conn = self._conn(host)
vm = conn.find_vm(name)
if not vm:
raise ValueError(f"VM '{name}' not found")
@ -133,7 +139,7 @@ class PowerOpsMixin(VSphereMixin):
return f"VM '{name}' is powered off, cannot suspend"
task = vm.SuspendVM_Task()
self.conn.wait_for_task(task)
conn.wait_for_task(task)
return f"VM '{name}' suspended"
@ -142,9 +148,10 @@ class PowerOpsMixin(VSphereMixin):
description="Put guest OS into standby mode (requires VMware Tools)",
annotations=ToolAnnotations(destructiveHint=False, idempotentHint=True),
)
def standby_guest(self, name: str) -> str:
def standby_guest(self, name: str, host: str | None = None) -> str:
"""Put guest OS into standby mode."""
vm = self.conn.find_vm(name)
conn = self._conn(host)
vm = conn.find_vm(name)
if not vm:
raise ValueError(f"VM '{name}' not found")

View File

@ -26,13 +26,13 @@ class ResourcesMixin(VSphereMixin):
)
def browse_datastore_root(self, datastore_name: str) -> list[dict[str, Any]]:
"""Browse files and folders at the root of a datastore."""
return self._browse_datastore_path(datastore_name, "")
return self._browse_datastore_path(self.conn, datastore_name, "")
def _browse_datastore_path(
self, datastore_name: str, path: str
self, conn, datastore_name: str, path: str
) -> list[dict[str, Any]]:
"""Browse files and folders on a datastore at a given path."""
ds = self.conn.find_datastore(datastore_name)
ds = conn.find_datastore(datastore_name)
if not ds:
raise ValueError(f"Datastore '{datastore_name}' not found")
@ -56,7 +56,7 @@ class ResourcesMixin(VSphereMixin):
# Search for files
task = browser.SearchDatastore_Task(ds_path, search_spec)
self.conn.wait_for_task(task)
conn.wait_for_task(task)
results = []
if task.info.result and task.info.result.file:
@ -85,7 +85,7 @@ class ResourcesMixin(VSphereMixin):
return sorted(results, key=lambda x: (x["type"] != "Folder", x["name"]))
def _stream_from_esxi(self, datastore: str, path: str, chunk_size: int = 1024 * 1024):
def _stream_from_esxi(self, conn, datastore: str, path: str, chunk_size: int = 1024 * 1024):
"""Generator that streams file content from ESXi datastore.
Yields raw bytes chunks as they arrive from ESXi HTTP API.
@ -95,13 +95,13 @@ class ResourcesMixin(VSphereMixin):
import urllib.request
from urllib.parse import quote
ds = self.conn.find_datastore(datastore)
ds = conn.find_datastore(datastore)
if not ds:
raise ValueError(f"Datastore '{datastore}' not found")
# Build download URL
dc_name = self.conn.datacenter.name
host = self.conn.settings.vcenter_host
dc_name = conn.datacenter.name
host = conn.settings.vcenter_host
encoded_path = quote(path, safe="")
url = (
@ -111,12 +111,12 @@ class ResourcesMixin(VSphereMixin):
# Create SSL context
context = ssl.create_default_context()
if self.conn.settings.vcenter_insecure:
if conn.settings.vcenter_insecure:
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE
# Get session cookie
stub = self.conn.si._stub
stub = conn.si._stub
cookie = stub.cookie
request = urllib.request.Request(url, method="GET")
@ -146,15 +146,17 @@ class ResourcesMixin(VSphereMixin):
annotations=ToolAnnotations(readOnlyHint=True),
)
def browse_datastore_tool(
self, datastore: str, path: str = ""
self, datastore: str, path: str = "", host: str | None = None
) -> list[dict[str, Any]]:
"""Browse files at a specific path on a datastore.
Args:
datastore: Datastore name (e.g., c1_ds-02)
path: Path within datastore (e.g., "rpm-desktop-1/" or "" for root)
host: Managed ESXi host to target (default: the default host)
"""
return self._browse_datastore_path(datastore, path)
conn = self._conn(host)
return self._browse_datastore_path(conn, datastore, path)
@mcp_tool(
name="download_from_datastore",
@ -167,6 +169,7 @@ class ResourcesMixin(VSphereMixin):
path: str,
save_to: str | None = None,
max_memory_mb: int = 50,
host: str | None = None,
) -> dict[str, Any]:
"""Download a file from a datastore using streaming.
@ -179,14 +182,16 @@ class ResourcesMixin(VSphereMixin):
path: Path to file on datastore (e.g., "iso/readme.txt")
save_to: Local path to save file (recommended for large files)
max_memory_mb: Max file size in MB to return in response (default 50MB)
host: Managed ESXi host to target (default: the default host)
Returns:
Dict with file content or save confirmation
"""
import base64
conn = self._conn(host)
max_bytes = max_memory_mb * 1024 * 1024
stream = self._stream_from_esxi(datastore, path)
stream = self._stream_from_esxi(conn, datastore, path)
# First yield is total size (or None)
total_size = next(stream)
@ -260,6 +265,7 @@ class ResourcesMixin(VSphereMixin):
local_path: str | None = None,
content_base64: str | None = None,
chunk_size: int = 8 * 1024 * 1024, # 8MB chunks
host: str | None = None,
) -> dict[str, Any]:
"""Upload a file to a datastore.
@ -272,6 +278,7 @@ class ResourcesMixin(VSphereMixin):
local_path: Local file path to upload - streams from disk (preferred for large files)
content_base64: Base64-encoded file content (for small files only)
chunk_size: Chunk size for streaming uploads (default 8MB)
host: Managed ESXi host to target (default: the default host)
Returns:
Dict with upload details including size and whether streaming was used
@ -281,35 +288,37 @@ class ResourcesMixin(VSphereMixin):
import ssl
import urllib.request
conn = self._conn(host)
if not local_path and not content_base64:
raise ValueError("Either local_path or content_base64 must be provided")
if local_path and content_base64:
raise ValueError("Only one of local_path or content_base64 can be provided")
ds = self.conn.find_datastore(datastore)
ds = conn.find_datastore(datastore)
if not ds:
raise ValueError(f"Datastore '{datastore}' not found")
# Build upload URL
dc_name = self.conn.datacenter.name
host = self.conn.settings.vcenter_host
dc_name = conn.datacenter.name
vc_host = conn.settings.vcenter_host
from urllib.parse import quote
encoded_path = quote(remote_path, safe="")
url = (
f"https://{host}/folder/{encoded_path}"
f"https://{vc_host}/folder/{encoded_path}"
f"?dcPath={quote(dc_name)}&dsName={quote(datastore)}"
)
# Create SSL context
context = ssl.create_default_context()
if self.conn.settings.vcenter_insecure:
if conn.settings.vcenter_insecure:
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE
# Get session cookie from existing connection
stub = self.conn.si._stub
stub = conn.si._stub
cookie = stub.cookie
if local_path:
@ -398,17 +407,21 @@ class ResourcesMixin(VSphereMixin):
description="Delete a file or folder from a datastore",
annotations=ToolAnnotations(destructiveHint=True, idempotentHint=True),
)
def delete_datastore_file(self, datastore: str, path: str) -> str:
def delete_datastore_file(
self, datastore: str, path: str, host: str | None = None
) -> str:
"""Delete a file or folder from a datastore.
Args:
datastore: Datastore name
path: Path to file or folder to delete (e.g., "iso/old-file.iso")
host: Managed ESXi host to target (default: the default host)
Returns:
Success message
"""
ds = self.conn.find_datastore(datastore)
conn = self._conn(host)
ds = conn.find_datastore(datastore)
if not ds:
raise ValueError(f"Datastore '{datastore}' not found")
@ -416,11 +429,11 @@ class ResourcesMixin(VSphereMixin):
ds_path = f"[{datastore}] {path}"
# Use FileManager to delete
file_manager = self.conn.content.fileManager
dc = self.conn.datacenter
file_manager = conn.content.fileManager
dc = conn.datacenter
task = file_manager.DeleteDatastoreFile_Task(name=ds_path, datacenter=dc)
self.conn.wait_for_task(task)
conn.wait_for_task(task)
return f"Deleted [{datastore}] {path}"
@ -429,17 +442,21 @@ class ResourcesMixin(VSphereMixin):
description="Create a folder on a datastore",
annotations=ToolAnnotations(destructiveHint=False, idempotentHint=True),
)
def create_datastore_folder(self, datastore: str, path: str) -> str:
def create_datastore_folder(
self, datastore: str, path: str, host: str | None = None
) -> str:
"""Create a folder on a datastore.
Args:
datastore: Datastore name
path: Folder path to create (e.g., "iso/new-folder")
host: Managed ESXi host to target (default: the default host)
Returns:
Success message
"""
ds = self.conn.find_datastore(datastore)
conn = self._conn(host)
ds = conn.find_datastore(datastore)
if not ds:
raise ValueError(f"Datastore '{datastore}' not found")
@ -447,8 +464,8 @@ class ResourcesMixin(VSphereMixin):
ds_path = f"[{datastore}] {path}"
# Use FileManager to create directory
file_manager = self.conn.content.fileManager
file_manager.MakeDirectory(name=ds_path, datacenter=self.conn.datacenter)
file_manager = conn.content.fileManager
file_manager.MakeDirectory(name=ds_path, datacenter=conn.datacenter)
return f"Created folder [{datastore}] {path}"
@ -473,6 +490,7 @@ class ResourcesMixin(VSphereMixin):
source_path: str,
dest_datastore: str | None = None,
dest_path: str | None = None,
host: str | None = None,
) -> dict[str, Any]:
"""Move or rename a file or folder on a datastore.
@ -481,11 +499,13 @@ class ResourcesMixin(VSphereMixin):
source_path: Source path (e.g., "iso/old-name.iso")
dest_datastore: Destination datastore (default: same as source)
dest_path: Destination path (default: same as source with new name)
host: Managed ESXi host to target (default: the default host)
Returns:
Dict with move operation details
"""
ds = self.conn.find_datastore(source_datastore)
conn = self._conn(host)
ds = conn.find_datastore(source_datastore)
if not ds:
raise ValueError(f"Datastore '{source_datastore}' not found")
@ -493,7 +513,7 @@ class ResourcesMixin(VSphereMixin):
if not dest_datastore:
dest_datastore = source_datastore
else:
dest_ds = self.conn.find_datastore(dest_datastore)
dest_ds = conn.find_datastore(dest_datastore)
if not dest_ds:
raise ValueError(f"Destination datastore '{dest_datastore}' not found")
@ -505,8 +525,8 @@ class ResourcesMixin(VSphereMixin):
dest_ds_path = f"[{dest_datastore}] {dest_path}"
# Use FileManager to move
file_manager = self.conn.content.fileManager
dc = self.conn.datacenter
file_manager = conn.content.fileManager
dc = conn.datacenter
task = file_manager.MoveDatastoreFile_Task(
sourceName=source_ds_path,
@ -515,7 +535,7 @@ class ResourcesMixin(VSphereMixin):
destinationDatacenter=dc,
force=False,
)
self.conn.wait_for_task(task)
conn.wait_for_task(task)
return {
"action": "moved",
@ -535,6 +555,7 @@ class ResourcesMixin(VSphereMixin):
dest_datastore: str | None = None,
dest_path: str | None = None,
force: bool = False,
host: str | None = None,
) -> dict[str, Any]:
"""Copy a file or folder on a datastore.
@ -544,11 +565,13 @@ class ResourcesMixin(VSphereMixin):
dest_datastore: Destination datastore (default: same as source)
dest_path: Destination path (required)
force: Overwrite destination if exists (default False)
host: Managed ESXi host to target (default: the default host)
Returns:
Dict with copy operation details
"""
ds = self.conn.find_datastore(source_datastore)
conn = self._conn(host)
ds = conn.find_datastore(source_datastore)
if not ds:
raise ValueError(f"Datastore '{source_datastore}' not found")
@ -556,7 +579,7 @@ class ResourcesMixin(VSphereMixin):
if not dest_datastore:
dest_datastore = source_datastore
else:
dest_ds = self.conn.find_datastore(dest_datastore)
dest_ds = conn.find_datastore(dest_datastore)
if not dest_ds:
raise ValueError(f"Destination datastore '{dest_datastore}' not found")
@ -568,8 +591,8 @@ class ResourcesMixin(VSphereMixin):
dest_ds_path = f"[{dest_datastore}] {dest_path}"
# Use FileManager to copy
file_manager = self.conn.content.fileManager
dc = self.conn.datacenter
file_manager = conn.content.fileManager
dc = conn.datacenter
task = file_manager.CopyDatastoreFile_Task(
sourceName=source_ds_path,
@ -578,7 +601,7 @@ class ResourcesMixin(VSphereMixin):
destinationDatacenter=dc,
force=force,
)
self.conn.wait_for_task(task)
conn.wait_for_task(task)
return {
"action": "copied",
@ -718,9 +741,15 @@ class ResourcesMixin(VSphereMixin):
description="Get detailed information about a specific datastore",
annotations=ToolAnnotations(readOnlyHint=True),
)
def get_datastore_info(self, name: str) -> dict[str, Any]:
"""Get detailed datastore information."""
ds = self.conn.find_datastore(name)
def get_datastore_info(self, name: str, host: str | None = None) -> dict[str, Any]:
"""Get detailed datastore information.
Args:
name: Datastore name
host: Managed ESXi host to target (default: the default host)
"""
conn = self._conn(host)
ds = conn.find_datastore(name)
if not ds:
raise ValueError(f"Datastore '{name}' not found")
@ -752,9 +781,15 @@ class ResourcesMixin(VSphereMixin):
description="Get detailed information about a specific network",
annotations=ToolAnnotations(readOnlyHint=True),
)
def get_network_info(self, name: str) -> dict[str, Any]:
"""Get detailed network information."""
net = self.conn.find_network(name)
def get_network_info(self, name: str, host: str | None = None) -> dict[str, Any]:
"""Get detailed network information.
Args:
name: Network name
host: Managed ESXi host to target (default: the default host)
"""
conn = self._conn(host)
net = conn.find_network(name)
if not net:
raise ValueError(f"Network '{name}' not found")
@ -783,14 +818,21 @@ class ResourcesMixin(VSphereMixin):
description="Get information about resource pools",
annotations=ToolAnnotations(readOnlyHint=True),
)
def get_resource_pool_info(self, name: str | None = None) -> dict[str, Any]:
def get_resource_pool_info(
self, name: str | None = None, host: str | None = None
) -> dict[str, Any]:
"""Get resource pool information.
If name is not provided, returns info for the default resource pool.
Args:
name: Resource pool name (default: the default resource pool)
host: Managed ESXi host to target (default: the default host)
"""
conn = self._conn(host)
if name:
container = self.conn.content.viewManager.CreateContainerView(
self.conn.content.rootFolder, [vim.ResourcePool], True
container = conn.content.viewManager.CreateContainerView(
conn.content.rootFolder, [vim.ResourcePool], True
)
try:
pool = next((p for p in container.view if p.name == name), None)
@ -799,7 +841,7 @@ class ResourcesMixin(VSphereMixin):
if not pool:
raise ValueError(f"Resource pool '{name}' not found")
else:
pool = self.conn.resource_pool
pool = conn.resource_pool
runtime = pool.summary.runtime
config = pool.summary.config
@ -822,10 +864,15 @@ class ResourcesMixin(VSphereMixin):
description="List all VM templates in the inventory",
annotations=ToolAnnotations(readOnlyHint=True),
)
def list_templates(self) -> list[dict[str, Any]]:
"""List all VM templates."""
def list_templates(self, host: str | None = None) -> list[dict[str, Any]]:
"""List all VM templates.
Args:
host: Managed ESXi host to target (default: the default host)
"""
conn = self._conn(host)
templates = []
for vm in self.conn.get_all_vms():
for vm in conn.get_all_vms():
if vm.config and vm.config.template:
templates.append(
{
@ -842,9 +889,14 @@ class ResourcesMixin(VSphereMixin):
description="Get vCenter/ESXi server information",
annotations=ToolAnnotations(readOnlyHint=True),
)
def get_vcenter_info(self) -> dict[str, Any]:
"""Get vCenter/ESXi server information."""
about = self.conn.content.about
def get_vcenter_info(self, host: str | None = None) -> dict[str, Any]:
"""Get vCenter/ESXi server information.
Args:
host: Managed ESXi host to target (default: the default host)
"""
conn = self._conn(host)
about = conn.content.about
return {
"name": about.name,
"full_name": about.fullName,

View File

@ -66,16 +66,18 @@ class SerialPortMixin(VSphereMixin):
description="Get current serial port configuration for a VM",
annotations=ToolAnnotations(readOnlyHint=True),
)
def get_serial_port(self, name: str) -> dict[str, Any]:
def get_serial_port(self, name: str, host: str | None = None) -> dict[str, Any]:
"""Get serial port configuration.
Args:
name: VM name
host: Managed ESXi host to target (default: the default host)
Returns:
Dict with serial port details or message if not configured
"""
vm = self.conn.find_vm(name)
conn = self._conn(host)
vm = conn.find_vm(name)
if not vm:
raise ValueError(f"VM '{name}' not found")
@ -109,6 +111,7 @@ class SerialPortMixin(VSphereMixin):
port: int | None = None,
direction: str = "server",
yield_on_poll: bool = True,
host: str | None = None,
) -> dict[str, Any]:
"""Setup or update network serial port.
@ -118,11 +121,13 @@ class SerialPortMixin(VSphereMixin):
port: TCP port number. If not specified, auto-assigns unused port.
direction: 'server' (VM listens) or 'client' (VM connects). Default: server
yield_on_poll: Enable CPU yield behavior. Default: True
host: Managed ESXi host to target (default: the default host)
Returns:
Dict with configured serial port URI and details
"""
vm = self.conn.find_vm(name)
conn = self._conn(host)
vm = conn.find_vm(name)
if not vm:
raise ValueError(f"VM '{name}' not found")
@ -141,8 +146,8 @@ class SerialPortMixin(VSphereMixin):
# Find or assign port
if port is None:
host = vm.runtime.host
host_ip = host.name if host else self.conn.settings.vcenter_host
vm_host = vm.runtime.host
host_ip = vm_host.name if vm_host else conn.settings.vcenter_host
port = self._find_unused_port(host_ip)
# Build service URI
@ -177,11 +182,11 @@ class SerialPortMixin(VSphereMixin):
spec = vim.vm.ConfigSpec()
spec.deviceChange = [serial_spec]
task = vm.ReconfigVM_Task(spec=spec)
self.conn.wait_for_task(task)
conn.wait_for_task(task)
# Get ESXi host info for connection string
host = vm.runtime.host
host_ip = host.name if host else self.conn.settings.vcenter_host
vm_host = vm.runtime.host
host_ip = vm_host.name if vm_host else conn.settings.vcenter_host
return {
"vm_name": name,
@ -199,17 +204,19 @@ class SerialPortMixin(VSphereMixin):
description="Connect or disconnect an existing serial port on a VM",
annotations=ToolAnnotations(destructiveHint=False, idempotentHint=True),
)
def connect_serial_port(self, name: str, connected: bool = True) -> dict[str, Any]:
def connect_serial_port(self, name: str, connected: bool = True, host: str | None = None) -> dict[str, Any]:
"""Connect or disconnect serial port.
Args:
name: VM name
connected: True to connect, False to disconnect. Default: True
host: Managed ESXi host to target (default: the default host)
Returns:
Dict with result
"""
vm = self.conn.find_vm(name)
conn = self._conn(host)
vm = conn.find_vm(name)
if not vm:
raise ValueError(f"VM '{name}' not found")
@ -226,7 +233,7 @@ class SerialPortMixin(VSphereMixin):
spec = vim.vm.ConfigSpec()
spec.deviceChange = [serial_spec]
task = vm.ReconfigVM_Task(spec=spec)
self.conn.wait_for_task(task)
conn.wait_for_task(task)
return {
"vm_name": name,
@ -239,18 +246,20 @@ class SerialPortMixin(VSphereMixin):
description="Reset serial port by disconnecting and reconnecting (clears stuck connections)",
annotations=ToolAnnotations(destructiveHint=False, idempotentHint=True),
)
def clear_serial_port(self, name: str) -> dict[str, Any]:
def clear_serial_port(self, name: str, host: str | None = None) -> dict[str, Any]:
"""Clear serial port by cycling connection state.
Useful for clearing stuck or stale connections.
Args:
name: VM name
host: Managed ESXi host to target (default: the default host)
Returns:
Dict with result
"""
vm = self.conn.find_vm(name)
conn = self._conn(host)
vm = conn.find_vm(name)
if not vm:
raise ValueError(f"VM '{name}' not found")
@ -259,11 +268,11 @@ class SerialPortMixin(VSphereMixin):
raise ValueError(f"No network serial port configured on VM '{name}'")
# Disconnect
self.connect_serial_port(name, connected=False)
self.connect_serial_port(name, connected=False, host=host)
time.sleep(1)
# Reconnect
self.connect_serial_port(name, connected=True)
self.connect_serial_port(name, connected=True, host=host)
return {
"vm_name": name,
@ -277,16 +286,18 @@ class SerialPortMixin(VSphereMixin):
description="Remove the network serial port from a VM. VM must be powered off.",
annotations=ToolAnnotations(destructiveHint=True, idempotentHint=True),
)
def remove_serial_port(self, name: str) -> str:
def remove_serial_port(self, name: str, host: str | None = None) -> str:
"""Remove serial port from VM.
Args:
name: VM name
host: Managed ESXi host to target (default: the default host)
Returns:
Success message
"""
vm = self.conn.find_vm(name)
conn = self._conn(host)
vm = conn.find_vm(name)
if not vm:
raise ValueError(f"VM '{name}' not found")
@ -306,6 +317,6 @@ class SerialPortMixin(VSphereMixin):
spec = vim.vm.ConfigSpec()
spec.deviceChange = [serial_spec]
task = vm.ReconfigVM_Task(spec=spec)
self.conn.wait_for_task(task)
conn.wait_for_task(task)
return f"Serial port removed from VM '{name}'"

View File

@ -55,9 +55,15 @@ class SnapshotsMixin(VSphereMixin):
description="List all snapshots for a virtual machine",
annotations=ToolAnnotations(readOnlyHint=True),
)
def list_snapshots(self, name: str) -> list[dict[str, Any]]:
"""List all snapshots for a VM."""
vm = self.conn.find_vm(name)
def list_snapshots(self, name: str, host: str | None = None) -> list[dict[str, Any]]:
"""List all snapshots for a VM.
Args:
name: VM name
host: Managed ESXi host to target (default: the default host)
"""
conn = self._conn(host)
vm = conn.find_vm(name)
if not vm:
raise ValueError(f"VM '{name}' not found")
@ -95,6 +101,7 @@ class SnapshotsMixin(VSphereMixin):
description: str = "",
memory: bool = True,
quiesce: bool = False,
host: str | None = None,
) -> str:
"""Create a VM snapshot.
@ -104,8 +111,10 @@ class SnapshotsMixin(VSphereMixin):
description: Optional description
memory: Include memory state (allows instant restore to running state)
quiesce: Quiesce guest filesystem (requires VMware Tools, ensures consistent state)
host: Managed ESXi host to target (default: the default host)
"""
vm = self.conn.find_vm(name)
conn = self._conn(host)
vm = conn.find_vm(name)
if not vm:
raise ValueError(f"VM '{name}' not found")
@ -126,7 +135,7 @@ class SnapshotsMixin(VSphereMixin):
memory=memory,
quiesce=quiesce,
)
self.conn.wait_for_task(task)
conn.wait_for_task(task)
return f"Snapshot '{snapshot_name}' created for VM '{name}'"
@ -135,9 +144,18 @@ class SnapshotsMixin(VSphereMixin):
description="Revert a VM to a specific snapshot",
annotations=ToolAnnotations(destructiveHint=True, idempotentHint=False),
)
def revert_to_snapshot(self, name: str, snapshot_name: str) -> str:
"""Revert VM to a specific snapshot."""
vm = self.conn.find_vm(name)
def revert_to_snapshot(
self, name: str, snapshot_name: str, host: str | None = None
) -> str:
"""Revert VM to a specific snapshot.
Args:
name: VM name
snapshot_name: Name of snapshot to revert to
host: Managed ESXi host to target (default: the default host)
"""
conn = self._conn(host)
vm = conn.find_vm(name)
if not vm:
raise ValueError(f"VM '{name}' not found")
@ -151,7 +169,7 @@ class SnapshotsMixin(VSphereMixin):
raise ValueError(f"Snapshot '{snapshot_name}' not found on VM '{name}'")
task = snapshot.RevertToSnapshot_Task()
self.conn.wait_for_task(task)
conn.wait_for_task(task)
return f"VM '{name}' reverted to snapshot '{snapshot_name}'"
@ -160,9 +178,15 @@ class SnapshotsMixin(VSphereMixin):
description="Revert a VM to its current (most recent) snapshot",
annotations=ToolAnnotations(destructiveHint=True, idempotentHint=False),
)
def revert_to_current_snapshot(self, name: str) -> str:
"""Revert VM to its current snapshot."""
vm = self.conn.find_vm(name)
def revert_to_current_snapshot(self, name: str, host: str | None = None) -> str:
"""Revert VM to its current snapshot.
Args:
name: VM name
host: Managed ESXi host to target (default: the default host)
"""
conn = self._conn(host)
vm = conn.find_vm(name)
if not vm:
raise ValueError(f"VM '{name}' not found")
@ -170,7 +194,7 @@ class SnapshotsMixin(VSphereMixin):
raise ValueError(f"VM '{name}' has no current snapshot")
task = vm.RevertToCurrentSnapshot_Task()
self.conn.wait_for_task(task)
conn.wait_for_task(task)
return f"VM '{name}' reverted to current snapshot"
@ -180,7 +204,11 @@ class SnapshotsMixin(VSphereMixin):
annotations=ToolAnnotations(destructiveHint=True, idempotentHint=True),
)
def delete_snapshot(
self, name: str, snapshot_name: str, remove_children: bool = False
self,
name: str,
snapshot_name: str,
remove_children: bool = False,
host: str | None = None,
) -> str:
"""Delete a VM snapshot.
@ -188,8 +216,10 @@ class SnapshotsMixin(VSphereMixin):
name: VM name
snapshot_name: Name of snapshot to delete
remove_children: If True, also delete child snapshots
host: Managed ESXi host to target (default: the default host)
"""
vm = self.conn.find_vm(name)
conn = self._conn(host)
vm = conn.find_vm(name)
if not vm:
raise ValueError(f"VM '{name}' not found")
@ -203,7 +233,7 @@ class SnapshotsMixin(VSphereMixin):
raise ValueError(f"Snapshot '{snapshot_name}' not found on VM '{name}'")
task = snapshot.RemoveSnapshot_Task(removeChildren=remove_children)
self.conn.wait_for_task(task)
conn.wait_for_task(task)
msg = f"Snapshot '{snapshot_name}' deleted from VM '{name}'"
if remove_children:
@ -215,9 +245,15 @@ class SnapshotsMixin(VSphereMixin):
description="Delete ALL snapshots from a VM (consolidates disk)",
annotations=ToolAnnotations(destructiveHint=True, idempotentHint=True),
)
def delete_all_snapshots(self, name: str) -> str:
"""Delete all snapshots from a VM."""
vm = self.conn.find_vm(name)
def delete_all_snapshots(self, name: str, host: str | None = None) -> str:
"""Delete all snapshots from a VM.
Args:
name: VM name
host: Managed ESXi host to target (default: the default host)
"""
conn = self._conn(host)
vm = conn.find_vm(name)
if not vm:
raise ValueError(f"VM '{name}' not found")
@ -225,7 +261,7 @@ class SnapshotsMixin(VSphereMixin):
return f"VM '{name}' has no snapshots to delete"
task = vm.RemoveAllSnapshots_Task()
self.conn.wait_for_task(task)
conn.wait_for_task(task)
return f"All snapshots deleted from VM '{name}'"
@ -240,9 +276,19 @@ class SnapshotsMixin(VSphereMixin):
snapshot_name: str,
new_name: str | None = None,
new_description: str | None = None,
host: str | None = None,
) -> str:
"""Rename a snapshot or update its description."""
vm = self.conn.find_vm(name)
"""Rename a snapshot or update its description.
Args:
name: VM name
snapshot_name: Name of snapshot to rename
new_name: New name for the snapshot
new_description: New description for the snapshot
host: Managed ESXi host to target (default: the default host)
"""
conn = self._conn(host)
vm = conn.find_vm(name)
if not vm:
raise ValueError(f"VM '{name}' not found")

View File

@ -43,6 +43,7 @@ class VCenterOpsMixin(VSphereMixin):
vm_name: str,
target_datastore: str,
thin_provision: bool | None = None,
host: str | None = None,
) -> dict[str, Any]:
"""Move a VM's storage to a different datastore.
@ -53,15 +54,17 @@ class VCenterOpsMixin(VSphereMixin):
vm_name: Name of the VM to migrate
target_datastore: Target datastore name
thin_provision: Convert to thin provisioning (None = keep current)
host: Managed ESXi host to target (default: the default host)
Returns:
Dict with migration details
"""
vm = self.conn.find_vm(vm_name)
conn = self._conn(host)
vm = conn.find_vm(vm_name)
if not vm:
raise ValueError(f"VM '{vm_name}' not found")
ds = self.conn.find_datastore(target_datastore)
ds = conn.find_datastore(target_datastore)
if not ds:
raise ValueError(f"Datastore '{target_datastore}' not found")
@ -88,7 +91,7 @@ class VCenterOpsMixin(VSphereMixin):
# Perform the relocation
task = vm.RelocateVM_Task(spec=relocate_spec)
self.conn.wait_for_task(task)
conn.wait_for_task(task)
return {
"vm": vm_name,
@ -108,6 +111,7 @@ class VCenterOpsMixin(VSphereMixin):
vm_name: str,
disk_label: str,
target_datastore: str,
host: str | None = None,
) -> dict[str, Any]:
"""Move a specific VM disk to a different datastore.
@ -115,15 +119,17 @@ class VCenterOpsMixin(VSphereMixin):
vm_name: Name of the VM
disk_label: Label of the disk (e.g., 'Hard disk 1')
target_datastore: Target datastore name
host: Managed ESXi host to target (default: the default host)
Returns:
Dict with migration details
"""
vm = self.conn.find_vm(vm_name)
conn = self._conn(host)
vm = conn.find_vm(vm_name)
if not vm:
raise ValueError(f"VM '{vm_name}' not found")
ds = self.conn.find_datastore(target_datastore)
ds = conn.find_datastore(target_datastore)
if not ds:
raise ValueError(f"Datastore '{target_datastore}' not found")
@ -157,7 +163,7 @@ class VCenterOpsMixin(VSphereMixin):
# Perform the relocation
task = vm.RelocateVM_Task(spec=relocate_spec)
self.conn.wait_for_task(task)
conn.wait_for_task(task)
return {
"vm": vm_name,
@ -176,7 +182,7 @@ class VCenterOpsMixin(VSphereMixin):
description="Convert a VM to a template (idempotent - safe to call on existing template)",
annotations=ToolAnnotations(destructiveHint=True, idempotentHint=True),
)
def convert_to_template(self, vm_name: str) -> dict[str, Any]:
def convert_to_template(self, vm_name: str, host: str | None = None) -> dict[str, Any]:
"""Convert a VM to a template.
The VM must be powered off. Once converted, it cannot be powered on
@ -184,11 +190,13 @@ class VCenterOpsMixin(VSphereMixin):
Args:
vm_name: Name of the VM to convert
host: Managed ESXi host to target (default: the default host)
Returns:
Dict with conversion details
"""
vm = self.conn.find_vm(vm_name)
conn = self._conn(host)
vm = conn.find_vm(vm_name)
if not vm:
raise ValueError(f"VM '{vm_name}' not found")
@ -219,17 +227,20 @@ class VCenterOpsMixin(VSphereMixin):
self,
template_name: str,
resource_pool: str | None = None,
host: str | None = None,
) -> dict[str, Any]:
"""Convert a template back to a regular VM.
Args:
template_name: Name of the template
resource_pool: Resource pool for the VM (optional)
host: Managed ESXi host to target (default: the default host)
Returns:
Dict with conversion details
"""
vm = self.conn.find_vm(template_name)
conn = self._conn(host)
vm = conn.find_vm(template_name)
if not vm:
raise ValueError(f"Template '{template_name}' not found")
@ -242,20 +253,20 @@ class VCenterOpsMixin(VSphereMixin):
# Get resource pool
if resource_pool:
pool = self._find_resource_pool(resource_pool)
pool = self._find_resource_pool(resource_pool, conn)
if not pool:
raise ValueError(f"Resource pool '{resource_pool}' not found")
else:
pool = self.conn.resource_pool
pool = conn.resource_pool
# Get a host from the resource pool
host = None
target_host = None
if hasattr(pool, "owner") and hasattr(pool.owner, "host"):
hosts = pool.owner.host
if hosts:
host = hosts[0]
target_host = hosts[0]
vm.MarkAsVirtualMachine(pool=pool, host=host)
vm.MarkAsVirtualMachine(pool=pool, host=target_host)
return {
"vm": template_name,
@ -263,10 +274,10 @@ class VCenterOpsMixin(VSphereMixin):
"is_template": False,
}
def _find_resource_pool(self, name: str) -> vim.ResourcePool | None:
def _find_resource_pool(self, name: str, conn) -> vim.ResourcePool | None:
"""Find a resource pool by name."""
container = self.conn.content.viewManager.CreateContainerView(
self.conn.content.rootFolder, [vim.ResourcePool], True
container = conn.content.viewManager.CreateContainerView(
conn.content.rootFolder, [vim.ResourcePool], True
)
try:
for pool in container.view:
@ -287,6 +298,7 @@ class VCenterOpsMixin(VSphereMixin):
new_vm_name: str,
datastore: str | None = None,
power_on: bool = False,
host: str | None = None,
) -> dict[str, Any]:
"""Deploy a new VM from a template.
@ -295,11 +307,13 @@ class VCenterOpsMixin(VSphereMixin):
new_vm_name: Name for the new VM
datastore: Target datastore (default: same as template)
power_on: Power on after deployment (default False)
host: Managed ESXi host to target (default: the default host)
Returns:
Dict with deployment details
"""
template = self.conn.find_vm(template_name)
conn = self._conn(host)
template = conn.find_vm(template_name)
if not template:
raise ValueError(f"Template '{template_name}' not found")
@ -307,15 +321,15 @@ class VCenterOpsMixin(VSphereMixin):
raise ValueError(f"'{template_name}' is not a template")
# Check if target VM already exists
if self.conn.find_vm(new_vm_name):
if conn.find_vm(new_vm_name):
raise ValueError(f"VM '{new_vm_name}' already exists")
# Build clone spec
relocate_spec = vim.vm.RelocateSpec()
relocate_spec.pool = self.conn.resource_pool
relocate_spec.pool = conn.resource_pool
if datastore:
ds = self.conn.find_datastore(datastore)
ds = conn.find_datastore(datastore)
if not ds:
raise ValueError(f"Datastore '{datastore}' not found")
relocate_spec.datastore = ds
@ -326,14 +340,14 @@ class VCenterOpsMixin(VSphereMixin):
clone_spec.template = False # Create VM, not another template
# Get target folder
folder = self.conn.datacenter.vmFolder
folder = conn.datacenter.vmFolder
# Clone the template
task = template.Clone(folder=folder, name=new_vm_name, spec=clone_spec)
self.conn.wait_for_task(task)
conn.wait_for_task(task)
# Get the new VM info
new_vm = self.conn.find_vm(new_vm_name)
new_vm = conn.find_vm(new_vm_name)
return {
"vm": new_vm_name,
@ -352,12 +366,16 @@ class VCenterOpsMixin(VSphereMixin):
description="List VM folders in the datacenter",
annotations=ToolAnnotations(readOnlyHint=True),
)
def list_folders(self) -> list[dict[str, Any]]:
def list_folders(self, host: str | None = None) -> list[dict[str, Any]]:
"""List all VM folders in the datacenter.
Args:
host: Managed ESXi host to target (default: the default host)
Returns:
List of folder details
"""
conn = self._conn(host)
folders = []
def _collect_folders(folder: vim.Folder, path: str = ""):
@ -375,7 +393,7 @@ class VCenterOpsMixin(VSphereMixin):
_collect_folders(child, current_path)
# Start from VM folder
vm_folder = self.conn.datacenter.vmFolder
vm_folder = conn.datacenter.vmFolder
_collect_folders(vm_folder)
return folders
@ -389,22 +407,25 @@ class VCenterOpsMixin(VSphereMixin):
self,
folder_name: str,
parent_path: str | None = None,
host: str | None = None,
) -> dict[str, Any]:
"""Create a new VM folder.
Args:
folder_name: Name for the new folder
parent_path: Path to parent folder (None = root vm folder)
host: Managed ESXi host to target (default: the default host)
Returns:
Dict with folder details
"""
conn = self._conn(host)
if parent_path:
parent = self._find_folder_by_path(parent_path)
parent = self._find_folder_by_path(parent_path, conn)
if not parent:
raise ValueError(f"Parent folder '{parent_path}' not found")
else:
parent = self.conn.datacenter.vmFolder
parent = conn.datacenter.vmFolder
parent.CreateFolder(name=folder_name)
@ -415,11 +436,11 @@ class VCenterOpsMixin(VSphereMixin):
"path": f"{parent_path}/{folder_name}" if parent_path else f"vm/{folder_name}",
}
def _find_folder_by_path(self, path: str) -> vim.Folder | None:
def _find_folder_by_path(self, path: str, conn) -> vim.Folder | None:
"""Find a folder by its path (e.g., 'vm/Production/WebServers')."""
parts = [p for p in path.split("/") if p and p != "vm"]
current = self.conn.datacenter.vmFolder
current = conn.datacenter.vmFolder
for part in parts:
found = None
if hasattr(current, "childEntity"):
@ -442,21 +463,24 @@ class VCenterOpsMixin(VSphereMixin):
self,
vm_name: str,
folder_path: str,
host: str | None = None,
) -> dict[str, Any]:
"""Move a VM to a different folder.
Args:
vm_name: Name of the VM to move
folder_path: Path to target folder
host: Managed ESXi host to target (default: the default host)
Returns:
Dict with move details
"""
vm = self.conn.find_vm(vm_name)
conn = self._conn(host)
vm = conn.find_vm(vm_name)
if not vm:
raise ValueError(f"VM '{vm_name}' not found")
folder = self._find_folder_by_path(folder_path)
folder = self._find_folder_by_path(folder_path, conn)
if not folder:
raise ValueError(f"Folder '{folder_path}' not found")
@ -465,7 +489,7 @@ class VCenterOpsMixin(VSphereMixin):
# Move the VM
task = folder.MoveIntoFolder_Task([vm])
self.conn.wait_for_task(task)
conn.wait_for_task(task)
return {
"vm": vm_name,
@ -487,17 +511,20 @@ class VCenterOpsMixin(VSphereMixin):
self,
max_count: int = 20,
entity_name: str | None = None,
host: str | None = None,
) -> list[dict[str, Any]]:
"""List recent tasks from vCenter.
Args:
max_count: Maximum number of tasks to return (default 20)
entity_name: Filter by entity name (optional)
host: Managed ESXi host to target (default: the default host)
Returns:
List of task details
"""
task_manager = self.conn.content.taskManager
conn = self._conn(host)
task_manager = conn.content.taskManager
recent_tasks = task_manager.recentTask
tasks = []
@ -543,6 +570,7 @@ class VCenterOpsMixin(VSphereMixin):
max_count: int = 50,
event_types: list[str] | None = None,
hours_back: int = 24,
host: str | None = None,
) -> list[dict[str, Any]]:
"""List recent events from vCenter.
@ -550,11 +578,13 @@ class VCenterOpsMixin(VSphereMixin):
max_count: Maximum number of events (default 50)
event_types: Filter by event type names (optional)
hours_back: How many hours back to look (default 24)
host: Managed ESXi host to target (default: the default host)
Returns:
List of event details
"""
event_manager = self.conn.content.eventManager
conn = self._conn(host)
event_manager = conn.content.eventManager
# Create filter spec
filter_spec = vim.event.EventFilterSpec()
@ -605,15 +635,19 @@ class VCenterOpsMixin(VSphereMixin):
description="List all clusters in the datacenter",
annotations=ToolAnnotations(readOnlyHint=True),
)
def list_clusters(self) -> list[dict[str, Any]]:
def list_clusters(self, host: str | None = None) -> list[dict[str, Any]]:
"""List all clusters in the datacenter.
Args:
host: Managed ESXi host to target (default: the default host)
Returns:
List of cluster details with DRS/HA status
"""
conn = self._conn(host)
clusters = []
for entity in self.conn.datacenter.hostFolder.childEntity:
for entity in conn.datacenter.hostFolder.childEntity:
if isinstance(entity, vim.ClusterComputeResource):
drs_config = entity.configuration.drsConfig
ha_config = entity.configuration.dasConfig
@ -652,16 +686,19 @@ class VCenterOpsMixin(VSphereMixin):
def get_drs_recommendations(
self,
cluster_name: str,
host: str | None = None,
) -> list[dict[str, Any]]:
"""Get DRS recommendations for a cluster.
Args:
cluster_name: Name of the cluster
host: Managed ESXi host to target (default: the default host)
Returns:
List of DRS recommendations
"""
cluster = self._find_cluster(cluster_name)
conn = self._conn(host)
cluster = self._find_cluster(cluster_name, conn)
if not cluster:
raise ValueError(f"Cluster '{cluster_name}' not found")
@ -701,9 +738,9 @@ class VCenterOpsMixin(VSphereMixin):
return recommendations
def _find_cluster(self, name: str) -> vim.ClusterComputeResource | None:
def _find_cluster(self, name: str, conn) -> vim.ClusterComputeResource | None:
"""Find a cluster by name."""
for entity in self.conn.datacenter.hostFolder.childEntity:
for entity in conn.datacenter.hostFolder.childEntity:
if isinstance(entity, vim.ClusterComputeResource) and entity.name == name:
return entity
return None

View File

@ -125,6 +125,7 @@ class VMLifecycleMixin(VSphereMixin):
iso_path: str | None = None,
iso_datastore: str | None = None,
boot_from_iso: bool = True,
host: str | None = None,
) -> str:
"""Create a new virtual machine with specified configuration.
@ -143,18 +144,20 @@ class VMLifecycleMixin(VSphereMixin):
iso_path: ISO path on a datastore (e.g. 'iso/ubuntu.iso') to mount in the CD/DVD drive
iso_datastore: Datastore holding the ISO (default: the VM's datastore)
boot_from_iso: When an ISO is given, put the CD/DVD first in the boot order
host: Managed ESXi host to target (default: the default host)
"""
conn = self._conn(host)
# Resolve datastore
datastore_obj = self.conn.datastore
datastore_obj = conn.datastore
if datastore:
datastore_obj = self.conn.find_datastore(datastore)
datastore_obj = conn.find_datastore(datastore)
if not datastore_obj:
raise ValueError(f"Datastore '{datastore}' not found")
# Resolve network
network_obj = self.conn.network
network_obj = conn.network
if network:
network_obj = self.conn.find_network(network)
network_obj = conn.find_network(network)
if not network_obj:
raise ValueError(f"Network '{network}' not found")
@ -263,10 +266,10 @@ class VMLifecycleMixin(VSphereMixin):
vm_spec.deviceChange = device_specs
# Create VM
task = self.conn.datacenter.vmFolder.CreateVM_Task(
config=vm_spec, pool=self.conn.resource_pool
task = conn.datacenter.vmFolder.CreateVM_Task(
config=vm_spec, pool=conn.resource_pool
)
self.conn.wait_for_task(task)
conn.wait_for_task(task)
return f"VM '{name}' created successfully"
@ -281,31 +284,33 @@ class VMLifecycleMixin(VSphereMixin):
new_name: str,
power_on: bool = False,
datastore: str | None = None,
host: str | None = None,
) -> str:
"""Clone a VM from a template or existing VM."""
template_vm = self.conn.find_vm(template_name)
conn = self._conn(host)
template_vm = conn.find_vm(template_name)
if not template_vm:
raise ValueError(f"Template VM '{template_name}' not found")
vm_folder = template_vm.parent
if not isinstance(vm_folder, vim.Folder):
vm_folder = self.conn.datacenter.vmFolder
vm_folder = conn.datacenter.vmFolder
# Resolve datastore
datastore_obj = self.conn.datastore
datastore_obj = conn.datastore
if datastore:
datastore_obj = self.conn.find_datastore(datastore)
datastore_obj = conn.find_datastore(datastore)
if not datastore_obj:
raise ValueError(f"Datastore '{datastore}' not found")
resource_pool = template_vm.resourcePool or self.conn.resource_pool
resource_pool = template_vm.resourcePool or conn.resource_pool
relocate_spec = vim.vm.RelocateSpec(pool=resource_pool, datastore=datastore_obj)
clone_spec = vim.vm.CloneSpec(
powerOn=power_on, template=False, location=relocate_spec
)
task = template_vm.Clone(folder=vm_folder, name=new_name, spec=clone_spec)
self.conn.wait_for_task(task)
conn.wait_for_task(task)
return f"VM '{new_name}' cloned from '{template_name}'"
@ -314,19 +319,20 @@ class VMLifecycleMixin(VSphereMixin):
description="Delete a virtual machine permanently (powers off if running)",
annotations=ToolAnnotations(destructiveHint=True, idempotentHint=True),
)
def delete_vm(self, name: str) -> str:
def delete_vm(self, name: str, host: str | None = None) -> str:
"""Delete a virtual machine permanently."""
vm = self.conn.find_vm(name)
conn = self._conn(host)
vm = conn.find_vm(name)
if not vm:
raise ValueError(f"VM '{name}' not found")
# Power off if running
if vm.runtime.powerState == vim.VirtualMachine.PowerState.poweredOn:
task = vm.PowerOffVM_Task()
self.conn.wait_for_task(task)
conn.wait_for_task(task)
task = vm.Destroy_Task()
self.conn.wait_for_task(task)
conn.wait_for_task(task)
return f"VM '{name}' deleted"
@ -341,9 +347,11 @@ class VMLifecycleMixin(VSphereMixin):
cpu: int | None = None,
memory_mb: int | None = None,
annotation: str | None = None,
host: str | None = None,
) -> str:
"""Reconfigure VM hardware settings."""
vm = self.conn.find_vm(name)
conn = self._conn(host)
vm = conn.find_vm(name)
if not vm:
raise ValueError(f"VM '{name}' not found")
@ -366,7 +374,7 @@ class VMLifecycleMixin(VSphereMixin):
return f"No changes specified for VM '{name}'"
task = vm.ReconfigVM_Task(spec=config_spec)
self.conn.wait_for_task(task)
conn.wait_for_task(task)
return f"VM '{name}' reconfigured: {', '.join(changes)}"
@ -375,13 +383,14 @@ class VMLifecycleMixin(VSphereMixin):
description="Rename a virtual machine",
annotations=ToolAnnotations(destructiveHint=False, idempotentHint=True),
)
def rename_vm(self, name: str, new_name: str) -> str:
def rename_vm(self, name: str, new_name: str, host: str | None = None) -> str:
"""Rename a virtual machine."""
vm = self.conn.find_vm(name)
conn = self._conn(host)
vm = conn.find_vm(name)
if not vm:
raise ValueError(f"VM '{name}' not found")
task = vm.Rename_Task(newName=new_name)
self.conn.wait_for_task(task)
conn.wait_for_task(task)
return f"VM renamed from '{name}' to '{new_name}'"