implement OAuth authentication with Authentik support

Core OAuth infrastructure:
- permissions.py: 5-level permission model (read_only → full_admin)
  Maps all 94 tools to permission levels
  Maps OAuth groups to permission sets
- audit.py: Centralized logging with OAuth user identity
- auth.py: OIDCProxy configuration for Authentik/OIDC providers
- middleware.py: Permission checking decorator and tool wrapper

Server integration:
- config.py: Add OAuth settings (oauth_enabled, oauth_issuer_url, etc.)
  Validate OAuth config completeness, require HTTP transport
- server.py: Integrate auth provider, add HTTP transport support
  Show OAuth status in startup banner

Deployment:
- docker-compose.oauth.yml: Authentik stack (server, worker, postgres, redis)
- .env.example: Document all OAuth and Authentik environment variables

Permission model:
- vsphere-readers: READ_ONLY (32 tools)
- vsphere-operators: + POWER_OPS (14 tools)
- vsphere-admins: + VM_LIFECYCLE (33 tools)
- vsphere-host-admins: + HOST_ADMIN (6 tools)
- vsphere-super-admins: + FULL_ADMIN (9 tools)
This commit is contained in:
Ryan Malloy 2025-12-27 01:12:58 -07:00
parent f843a8a161
commit cda49f2912
9 changed files with 907 additions and 14 deletions

View File

@ -1,8 +1,8 @@
# ESXi MCP Server Configuration
# mcvsphere Configuration
# Copy this file to .env and fill in your values
# Docker Compose project name (prevents environment clashes)
COMPOSE_PROJECT=esxi-mcp
COMPOSE_PROJECT=mcvsphere
# ─────────────────────────────────────────────────────────────────────────────
# VMware vCenter/ESXi Connection (Required)
@ -35,10 +35,10 @@ VCENTER_INSECURE=true
# API key for authentication (optional, but recommended for production)
# MCP_API_KEY=your-secret-api-key
# Transport type: stdio (for Claude Desktop) or sse (for web/Docker)
# Transport type: stdio (Claude Desktop), sse (web/Docker), http (OAuth)
MCP_TRANSPORT=sse
# Server binding (only used with SSE transport)
# Server binding (only used with SSE/HTTP transport)
MCP_HOST=0.0.0.0
MCP_PORT=8080
@ -49,4 +49,44 @@ MCP_PORT=8080
LOG_LEVEL=INFO
# Log file path (logs to console if not specified)
# LOG_FILE=/app/logs/esxi-mcp.log
# LOG_FILE=/app/logs/mcvsphere.log
# ─────────────────────────────────────────────────────────────────────────────
# OAuth/OIDC Configuration (Optional - requires Authentik or other OIDC provider)
# ─────────────────────────────────────────────────────────────────────────────
# Enable OAuth authentication (requires MCP_TRANSPORT=http or sse)
OAUTH_ENABLED=false
# OIDC issuer URL (Authentik application URL)
# Example: https://auth.example.com/application/o/mcvsphere/
# OAUTH_ISSUER_URL=
# OAuth client credentials (from Authentik application)
# OAUTH_CLIENT_ID=
# OAUTH_CLIENT_SECRET=
# OAuth scopes to request (comma-separated or JSON array)
# OAUTH_SCOPES=["openid", "profile", "email", "groups"]
# OAuth groups required for access (empty = any authenticated user)
# OAUTH_REQUIRED_GROUPS=["vsphere-readers"]
# ─────────────────────────────────────────────────────────────────────────────
# Authentik Deployment (for docker-compose.oauth.yml)
# ─────────────────────────────────────────────────────────────────────────────
# Authentik secret key (generate with: openssl rand -base64 36)
# AUTHENTIK_SECRET_KEY=
# Authentik PostgreSQL password
# AUTHENTIK_DB_PASSWORD=
# Authentik bootstrap admin (first run only)
# AUTHENTIK_BOOTSTRAP_EMAIL=admin@localhost
# AUTHENTIK_BOOTSTRAP_PASSWORD=
# Authentik ports
# AUTHENTIK_PORT=9000
# AUTHENTIK_HTTPS_PORT=9443
# Authentik hostname (for Caddy reverse proxy)
# AUTHENTIK_HOST=auth.localhost

127
docker-compose.oauth.yml Normal file
View File

@ -0,0 +1,127 @@
# OAuth-enabled deployment with Authentik
# Usage: docker compose -f docker-compose.yml -f docker-compose.oauth.yml up
#
# This overlay adds Authentik identity provider for OAuth authentication.
# Requires AUTHENTIK_* environment variables to be set.
services:
# ─────────────────────────────────────────────────────────────────────────
# PostgreSQL for Authentik
# ─────────────────────────────────────────────────────────────────────────
authentik-db:
image: postgres:16-alpine
container_name: mcvsphere-authentik-db
restart: unless-stopped
environment:
POSTGRES_DB: authentik
POSTGRES_USER: authentik
POSTGRES_PASSWORD: ${AUTHENTIK_DB_PASSWORD:?AUTHENTIK_DB_PASSWORD required}
volumes:
- authentik-db-data:/var/lib/postgresql/data
networks:
- authentik-internal
healthcheck:
test: ["CMD-SHELL", "pg_isready -U authentik"]
interval: 10s
timeout: 5s
retries: 5
# ─────────────────────────────────────────────────────────────────────────
# Redis for Authentik
# ─────────────────────────────────────────────────────────────────────────
authentik-redis:
image: redis:7-alpine
container_name: mcvsphere-authentik-redis
restart: unless-stopped
command: ["redis-server", "--save", "60", "1", "--loglevel", "warning"]
volumes:
- authentik-redis-data:/data
networks:
- authentik-internal
healthcheck:
test: ["CMD", "redis-cli", "ping"]
interval: 10s
timeout: 5s
retries: 5
# ─────────────────────────────────────────────────────────────────────────
# Authentik Server
# ─────────────────────────────────────────────────────────────────────────
authentik-server:
image: ghcr.io/goauthentik/server:2024.10.4
container_name: mcvsphere-authentik-server
restart: unless-stopped
command: server
environment:
AUTHENTIK_SECRET_KEY: ${AUTHENTIK_SECRET_KEY:?AUTHENTIK_SECRET_KEY required}
AUTHENTIK_REDIS__HOST: authentik-redis
AUTHENTIK_POSTGRESQL__HOST: authentik-db
AUTHENTIK_POSTGRESQL__USER: authentik
AUTHENTIK_POSTGRESQL__NAME: authentik
AUTHENTIK_POSTGRESQL__PASSWORD: ${AUTHENTIK_DB_PASSWORD}
# Bootstrap admin user (first run only)
AUTHENTIK_BOOTSTRAP_EMAIL: ${AUTHENTIK_BOOTSTRAP_EMAIL:-admin@localhost}
AUTHENTIK_BOOTSTRAP_PASSWORD: ${AUTHENTIK_BOOTSTRAP_PASSWORD:-}
ports:
- "${AUTHENTIK_PORT:-9000}:9000"
- "${AUTHENTIK_HTTPS_PORT:-9443}:9443"
volumes:
- authentik-media:/media
- authentik-templates:/templates
depends_on:
authentik-db:
condition: service_healthy
authentik-redis:
condition: service_healthy
networks:
- authentik-internal
- mcvsphere-network
healthcheck:
test: ["CMD", "ak", "healthcheck"]
interval: 30s
timeout: 10s
retries: 3
start_period: 60s
labels:
# Caddy reverse proxy (if using caddy-docker-proxy)
caddy: ${AUTHENTIK_HOST:-auth.localhost}
caddy.reverse_proxy: "{{upstreams 9000}}"
# ─────────────────────────────────────────────────────────────────────────
# Authentik Worker (background tasks)
# ─────────────────────────────────────────────────────────────────────────
authentik-worker:
image: ghcr.io/goauthentik/server:2024.10.4
container_name: mcvsphere-authentik-worker
restart: unless-stopped
command: worker
environment:
AUTHENTIK_SECRET_KEY: ${AUTHENTIK_SECRET_KEY}
AUTHENTIK_REDIS__HOST: authentik-redis
AUTHENTIK_POSTGRESQL__HOST: authentik-db
AUTHENTIK_POSTGRESQL__USER: authentik
AUTHENTIK_POSTGRESQL__NAME: authentik
AUTHENTIK_POSTGRESQL__PASSWORD: ${AUTHENTIK_DB_PASSWORD}
volumes:
- authentik-media:/media
- authentik-templates:/templates
depends_on:
authentik-db:
condition: service_healthy
authentik-redis:
condition: service_healthy
networks:
- authentik-internal
networks:
authentik-internal:
driver: bridge
mcvsphere-network:
external: true
name: ${COMPOSE_PROJECT_NAME:-mcvsphere}_default
volumes:
authentik-db-data:
authentik-redis-data:
authentik-media:
authentik-templates:

186
src/mcvsphere/audit.py Normal file
View File

@ -0,0 +1,186 @@
"""Audit logging for OAuth-authenticated operations.
Provides centralized logging with OAuth user identity for all tool invocations.
"""
import logging
from contextvars import ContextVar
from datetime import UTC, datetime
from typing import Any
logger = logging.getLogger("mcvsphere.audit")
# Context variable to store current user info for the request
_current_user: ContextVar[dict[str, Any] | None] = ContextVar("current_user", default=None)
def set_current_user(user_info: dict[str, Any] | None) -> None:
"""Set the current user for this request context.
Args:
user_info: User information extracted from OAuth token, or None for anonymous.
"""
_current_user.set(user_info)
def get_current_user() -> dict[str, Any] | None:
"""Get the current user for this request context.
Returns:
User information dict or None if no user is authenticated.
"""
return _current_user.get()
def get_username() -> str:
"""Get the username of the current user.
Returns:
Username string, or 'anonymous' if no user is authenticated.
"""
user = get_current_user()
if not user:
return "anonymous"
# Try common OAuth claim names in order of preference
for claim in ("preferred_username", "email", "sub"):
if value := user.get(claim):
return str(value)
return "unknown"
def get_user_groups() -> list[str]:
"""Get the groups of the current user.
Returns:
List of group names, or empty list if none.
"""
user = get_current_user()
if not user:
return []
return user.get("groups", [])
def _sanitize_args(args: dict[str, Any]) -> dict[str, Any]:
"""Remove sensitive values from args for logging.
Args:
args: Tool arguments dict.
Returns:
Sanitized args with sensitive values redacted.
"""
sensitive_patterns = {"password", "secret", "token", "credential", "key"}
def is_sensitive(key: str) -> bool:
key_lower = key.lower()
return any(pattern in key_lower for pattern in sensitive_patterns)
return {k: "***REDACTED***" if is_sensitive(k) else v for k, v in args.items()}
def _truncate(value: str | None, max_length: int = 200) -> str | None:
"""Truncate a string value for logging.
Args:
value: String to truncate.
max_length: Maximum length.
Returns:
Truncated string or None.
"""
if value is None:
return None
if len(value) <= max_length:
return value
return value[:max_length] + "..."
def audit_log(
tool_name: str,
args: dict[str, Any],
result: str | None = None,
error: str | None = None,
duration_ms: float | None = None,
) -> None:
"""Log a tool invocation with OAuth user identity.
Args:
tool_name: Name of the MCP tool invoked.
args: Tool arguments (will be sanitized).
result: Tool result string (will be truncated).
error: Error message if tool failed.
duration_ms: Execution time in milliseconds.
"""
username = get_username()
groups = get_user_groups()
log_entry = {
"timestamp": datetime.now(UTC).isoformat(),
"user": username,
"groups": groups,
"tool": tool_name,
"args": _sanitize_args(args),
"duration_ms": round(duration_ms, 2) if duration_ms else None,
"result": _truncate(result),
"error": error,
}
# Remove None values for cleaner logs
log_entry = {k: v for k, v in log_entry.items() if v is not None}
if error:
logger.warning("AUDIT_FAIL: %s", log_entry)
else:
logger.info("AUDIT: %s", log_entry)
def audit_permission_denied(
tool_name: str,
args: dict[str, Any],
required_permission: str,
) -> None:
"""Log a permission denied event.
Args:
tool_name: Name of the MCP tool attempted.
args: Tool arguments (will be sanitized).
required_permission: The permission level that was required.
"""
username = get_username()
groups = get_user_groups()
log_entry = {
"timestamp": datetime.now(UTC).isoformat(),
"user": username,
"groups": groups,
"tool": tool_name,
"args": _sanitize_args(args),
"required_permission": required_permission,
"event": "PERMISSION_DENIED",
}
logger.warning("AUDIT_DENIED: %s", log_entry)
def audit_auth_event(
event_type: str,
username: str | None = None,
details: dict[str, Any] | None = None,
) -> None:
"""Log an authentication event.
Args:
event_type: Type of auth event (login, logout, token_refresh, etc.)
username: Username if known.
details: Additional event details.
"""
log_entry = {
"timestamp": datetime.now(UTC).isoformat(),
"event": event_type,
"user": username or get_username(),
**(details or {}),
}
logger.info("AUTH_EVENT: %s", log_entry)

63
src/mcvsphere/auth.py Normal file
View File

@ -0,0 +1,63 @@
"""OAuth authentication configuration for mcvsphere.
Provides OIDCProxy configuration for Authentik or other OIDC providers.
"""
import logging
from mcvsphere.config import Settings
logger = logging.getLogger(__name__)
def create_auth_provider(settings: Settings):
"""Create OAuth provider if enabled.
Args:
settings: Application settings with OAuth configuration.
Returns:
OIDCProxy instance if OAuth is enabled, None otherwise.
"""
if not settings.oauth_enabled:
logger.debug("OAuth authentication disabled")
return None
# Import here to avoid loading auth dependencies when not needed
from fastmcp.server.auth import OIDCProxy
# Build the OIDC config URL from issuer URL
# Authentik format: https://auth.example.com/application/o/<app>/
# Discovery URL: https://auth.example.com/application/o/<app>/.well-known/openid-configuration
issuer_url = settings.oauth_issuer_url.rstrip("/")
if not issuer_url.endswith("/.well-known/openid-configuration"):
config_url = f"{issuer_url}/.well-known/openid-configuration"
else:
config_url = issuer_url
# Build base URL for the MCP server
base_url = f"http://{settings.mcp_host}:{settings.mcp_port}"
logger.info("Configuring OAuth with OIDC provider: %s", issuer_url)
try:
auth = OIDCProxy(
config_url=config_url,
client_id=settings.oauth_client_id,
client_secret=settings.oauth_client_secret.get_secret_value(),
base_url=base_url,
required_scopes=settings.oauth_scopes,
# Allow Claude Code localhost redirects
allowed_client_redirect_uris=[
"http://localhost:*",
"http://127.0.0.1:*",
],
# Skip consent screen for MCP clients (they're already trusted)
require_authorization_consent=False,
)
logger.info("OAuth authentication enabled via OIDC")
return auth
except Exception as e:
logger.error("Failed to configure OAuth: %s", e)
raise ValueError(f"OAuth configuration failed: {e}") from e

View File

@ -5,7 +5,7 @@ from pathlib import Path
from typing import Literal
import yaml
from pydantic import Field, SecretStr, field_validator
from pydantic import Field, SecretStr, field_validator, model_validator
from pydantic_settings import BaseSettings, SettingsConfigDict
@ -49,8 +49,31 @@ class Settings(BaseSettings):
)
mcp_host: str = Field(default="0.0.0.0", description="Server bind address")
mcp_port: int = Field(default=8080, description="Server port")
mcp_transport: Literal["stdio", "sse"] = Field(
default="stdio", description="MCP transport type"
mcp_transport: Literal["stdio", "sse", "http"] = Field(
default="stdio", description="MCP transport type (http required for OAuth)"
)
# OAuth/OIDC settings
oauth_enabled: bool = Field(
default=False, description="Enable OAuth authentication via OIDC"
)
oauth_issuer_url: str | None = Field(
default=None,
description="OIDC issuer URL (e.g., https://auth.example.com/application/o/mcvsphere/)",
)
oauth_client_id: str | None = Field(
default=None, description="OAuth client ID from OIDC provider"
)
oauth_client_secret: SecretStr | None = Field(
default=None, description="OAuth client secret from OIDC provider"
)
oauth_scopes: list[str] = Field(
default_factory=lambda: ["openid", "profile", "email", "groups"],
description="OAuth scopes to request",
)
oauth_required_groups: list[str] = Field(
default_factory=list,
description="OAuth groups required for access (empty = any authenticated user)",
)
# Logging settings
@ -61,13 +84,35 @@ class Settings(BaseSettings):
default=None, description="Log file path (logs to console if not specified)"
)
@field_validator("vcenter_insecure", mode="before")
@field_validator("vcenter_insecure", "oauth_enabled", mode="before")
@classmethod
def parse_bool(cls, v: str | bool) -> bool:
if isinstance(v, bool):
return v
return v.lower() in ("true", "1", "yes", "on")
@model_validator(mode="after")
def validate_oauth_config(self) -> "Settings":
"""Validate OAuth configuration is complete when enabled."""
if self.oauth_enabled:
missing = []
if not self.oauth_issuer_url:
missing.append("oauth_issuer_url")
if not self.oauth_client_id:
missing.append("oauth_client_id")
if not self.oauth_client_secret:
missing.append("oauth_client_secret")
if missing:
raise ValueError(
f"OAuth is enabled but missing required settings: {', '.join(missing)}"
)
# OAuth requires HTTP transport
if self.mcp_transport == "stdio":
raise ValueError(
"OAuth requires HTTP transport. Set mcp_transport='http' or 'sse'"
)
return self
@classmethod
def from_yaml(cls, path: Path) -> "Settings":
"""Load settings from a YAML file, with env vars taking precedence."""

181
src/mcvsphere/middleware.py Normal file
View File

@ -0,0 +1,181 @@
"""Middleware for permission checking and audit logging.
Provides decorators and hooks for wrapping tool execution with:
- OAuth permission validation
- Audit logging with user identity
"""
import time
from collections.abc import Callable
from functools import wraps
from typing import Any
from mcvsphere.audit import (
audit_log,
audit_permission_denied,
get_current_user,
get_user_groups,
set_current_user,
)
from mcvsphere.permissions import (
PermissionDeniedError,
PermissionLevel,
check_permission,
get_required_permission,
)
def with_permission_check(tool_name: str) -> Callable:
"""Decorator factory for permission checking and audit logging.
Args:
tool_name: Name of the MCP tool being wrapped.
Returns:
Decorator that wraps the tool function with permission checks and audit logging.
Example:
@with_permission_check("power_on")
def power_on(self, name: str) -> str:
...
"""
def decorator(func: Callable) -> Callable:
@wraps(func)
def wrapper(*args, **kwargs) -> Any:
# Get user groups from current context
groups = get_user_groups()
user = get_current_user()
username = "anonymous"
if user:
username = user.get(
"preferred_username", user.get("email", user.get("sub", "unknown"))
)
# Check permission
if not check_permission(tool_name, groups):
required = get_required_permission(tool_name)
audit_permission_denied(tool_name, kwargs, required.value)
raise PermissionDeniedError(username, tool_name, required)
# Execute tool with timing
start_time = time.perf_counter()
try:
result = func(*args, **kwargs)
duration_ms = (time.perf_counter() - start_time) * 1000
audit_log(tool_name, kwargs, result=str(result), duration_ms=duration_ms)
return result
except PermissionDeniedError:
# Re-raise permission errors without additional logging
raise
except Exception as e:
duration_ms = (time.perf_counter() - start_time) * 1000
audit_log(tool_name, kwargs, error=str(e), duration_ms=duration_ms)
raise
return wrapper
return decorator
def extract_user_from_context(ctx) -> dict[str, Any] | None:
"""Extract user information from FastMCP context.
Args:
ctx: FastMCP Context object.
Returns:
User info dict from OAuth token claims, or None if not authenticated.
"""
if ctx is None:
return None
# Try to get access token from context
try:
# FastMCP stores the access token in request_context
if hasattr(ctx, "request_context") and ctx.request_context:
token = getattr(ctx.request_context, "access_token", None)
if token and hasattr(token, "claims"):
return token.claims
except Exception:
pass
return None
def setup_user_context(ctx) -> None:
"""Set up user context from FastMCP context for the current request.
Call this at the start of request handling to make user info
available throughout the request via get_current_user().
Args:
ctx: FastMCP Context object.
"""
user_info = extract_user_from_context(ctx)
set_current_user(user_info)
class PermissionMiddleware:
"""Middleware for adding permission checks to all tools.
This can be used to wrap mixin tool registration with permission checking.
"""
def __init__(self, oauth_enabled: bool = False):
"""Initialize middleware.
Args:
oauth_enabled: Whether OAuth authentication is enabled.
"""
self.oauth_enabled = oauth_enabled
def wrap_tool(self, tool_name: str, func: Callable) -> Callable:
"""Wrap a tool function with permission checking.
Args:
tool_name: Name of the tool.
func: Original tool function.
Returns:
Wrapped function with permission checks.
"""
if not self.oauth_enabled:
# No auth - just add basic audit logging
@wraps(func)
def wrapper(*args, **kwargs) -> Any:
start_time = time.perf_counter()
try:
result = func(*args, **kwargs)
duration_ms = (time.perf_counter() - start_time) * 1000
audit_log(tool_name, kwargs, result=str(result), duration_ms=duration_ms)
return result
except Exception as e:
duration_ms = (time.perf_counter() - start_time) * 1000
audit_log(tool_name, kwargs, error=str(e), duration_ms=duration_ms)
raise
return wrapper
# With auth - add permission checking
return with_permission_check(tool_name)(func)
def get_permission_summary() -> dict[str, list[str]]:
"""Get a summary of tools grouped by permission level.
Returns:
Dict mapping permission level names to lists of tool names.
"""
from mcvsphere.permissions import TOOL_PERMISSIONS
summary: dict[str, list[str]] = {level.value: [] for level in PermissionLevel}
for tool_name, level in TOOL_PERMISSIONS.items():
summary[level.value].append(tool_name)
# Sort tool names within each level
for level in summary:
summary[level].sort()
return summary

View File

@ -0,0 +1,238 @@
"""Permission escalation based on OAuth claims.
Defines permission levels and maps:
- Tools Required permission level
- OAuth groups Granted permission levels
"""
from enum import Enum
class PermissionLevel(Enum):
"""Permission levels for tool access, from least to most privileged."""
READ_ONLY = "read_only" # View-only operations
POWER_OPS = "power_ops" # Power on/off, snapshots
VM_LIFECYCLE = "vm_lifecycle" # Create/delete/modify VMs
HOST_ADMIN = "host_admin" # ESXi host operations
FULL_ADMIN = "full_admin" # Everything including guest ops, services
# Tool → Required Permission mapping
# Default is READ_ONLY if not listed
TOOL_PERMISSIONS: dict[str, PermissionLevel] = {
# ═══════════════════════════════════════════════════════════════════════
# READ_ONLY - Safe viewing operations (36 tools)
# ═══════════════════════════════════════════════════════════════════════
"list_vms": PermissionLevel.READ_ONLY,
"get_vm_info": PermissionLevel.READ_ONLY,
"list_snapshots": PermissionLevel.READ_ONLY,
"get_vm_stats": PermissionLevel.READ_ONLY,
"get_host_stats": PermissionLevel.READ_ONLY,
"list_hosts": PermissionLevel.READ_ONLY,
"get_recent_tasks": PermissionLevel.READ_ONLY,
"get_recent_events": PermissionLevel.READ_ONLY,
"get_alarms": PermissionLevel.READ_ONLY,
"browse_datastore": PermissionLevel.READ_ONLY,
"get_datastore_info": PermissionLevel.READ_ONLY,
"get_network_info": PermissionLevel.READ_ONLY,
"get_resource_pool_info": PermissionLevel.READ_ONLY,
"list_templates": PermissionLevel.READ_ONLY,
"get_vcenter_info": PermissionLevel.READ_ONLY,
"list_disks": PermissionLevel.READ_ONLY,
"list_nics": PermissionLevel.READ_ONLY,
"list_ovf_networks": PermissionLevel.READ_ONLY,
"get_host_info": PermissionLevel.READ_ONLY,
"list_services": PermissionLevel.READ_ONLY,
"get_ntp_config": PermissionLevel.READ_ONLY,
"get_host_hardware": PermissionLevel.READ_ONLY,
"get_host_networking": PermissionLevel.READ_ONLY,
"list_folders": PermissionLevel.READ_ONLY,
"list_recent_tasks": PermissionLevel.READ_ONLY,
"list_recent_events": PermissionLevel.READ_ONLY,
"list_clusters": PermissionLevel.READ_ONLY,
"get_drs_recommendations": PermissionLevel.READ_ONLY,
"get_serial_port": PermissionLevel.READ_ONLY,
"wait_for_vm_tools": PermissionLevel.READ_ONLY,
"get_vm_tools_status": PermissionLevel.READ_ONLY,
"vm_screenshot": PermissionLevel.READ_ONLY,
# ═══════════════════════════════════════════════════════════════════════
# POWER_OPS - Power and snapshot operations (14 tools)
# ═══════════════════════════════════════════════════════════════════════
"power_on": PermissionLevel.POWER_OPS,
"power_off": PermissionLevel.POWER_OPS,
"shutdown_guest": PermissionLevel.POWER_OPS,
"reboot_guest": PermissionLevel.POWER_OPS,
"reset_vm": PermissionLevel.POWER_OPS,
"suspend_vm": PermissionLevel.POWER_OPS,
"standby_guest": PermissionLevel.POWER_OPS,
"create_snapshot": PermissionLevel.POWER_OPS,
"revert_to_snapshot": PermissionLevel.POWER_OPS,
"revert_to_current_snapshot": PermissionLevel.POWER_OPS,
"delete_snapshot": PermissionLevel.POWER_OPS,
"delete_all_snapshots": PermissionLevel.POWER_OPS,
"rename_snapshot": PermissionLevel.POWER_OPS,
"connect_nic": PermissionLevel.POWER_OPS, # Connect/disconnect is power-level
# ═══════════════════════════════════════════════════════════════════════
# VM_LIFECYCLE - Create/delete/modify VMs (28 tools)
# ═══════════════════════════════════════════════════════════════════════
"create_vm": PermissionLevel.VM_LIFECYCLE,
"clone_vm": PermissionLevel.VM_LIFECYCLE,
"delete_vm": PermissionLevel.VM_LIFECYCLE,
"reconfigure_vm": PermissionLevel.VM_LIFECYCLE,
"rename_vm": PermissionLevel.VM_LIFECYCLE,
"add_disk": PermissionLevel.VM_LIFECYCLE,
"remove_disk": PermissionLevel.VM_LIFECYCLE,
"extend_disk": PermissionLevel.VM_LIFECYCLE,
"attach_iso": PermissionLevel.VM_LIFECYCLE,
"detach_iso": PermissionLevel.VM_LIFECYCLE,
"add_nic": PermissionLevel.VM_LIFECYCLE,
"remove_nic": PermissionLevel.VM_LIFECYCLE,
"change_nic_network": PermissionLevel.VM_LIFECYCLE,
"set_nic_mac": PermissionLevel.VM_LIFECYCLE,
"deploy_ovf": PermissionLevel.VM_LIFECYCLE,
"export_vm_ovf": PermissionLevel.VM_LIFECYCLE,
"convert_to_template": PermissionLevel.VM_LIFECYCLE,
"convert_to_vm": PermissionLevel.VM_LIFECYCLE,
"deploy_from_template": PermissionLevel.VM_LIFECYCLE,
"create_folder": PermissionLevel.VM_LIFECYCLE,
"move_vm_to_folder": PermissionLevel.VM_LIFECYCLE,
"storage_vmotion": PermissionLevel.VM_LIFECYCLE,
"move_vm_disk": PermissionLevel.VM_LIFECYCLE,
"setup_serial_port": PermissionLevel.VM_LIFECYCLE,
"connect_serial_port": PermissionLevel.VM_LIFECYCLE,
"clear_serial_port": PermissionLevel.VM_LIFECYCLE,
"remove_serial_port": PermissionLevel.VM_LIFECYCLE,
# Datastore modifications
"download_from_datastore": PermissionLevel.VM_LIFECYCLE,
"upload_to_datastore": PermissionLevel.VM_LIFECYCLE,
"delete_datastore_file": PermissionLevel.VM_LIFECYCLE,
"create_datastore_folder": PermissionLevel.VM_LIFECYCLE,
"move_datastore_file": PermissionLevel.VM_LIFECYCLE,
"copy_datastore_file": PermissionLevel.VM_LIFECYCLE,
# ═══════════════════════════════════════════════════════════════════════
# HOST_ADMIN - ESXi host operations (6 tools)
# ═══════════════════════════════════════════════════════════════════════
"enter_maintenance_mode": PermissionLevel.HOST_ADMIN,
"exit_maintenance_mode": PermissionLevel.HOST_ADMIN,
"reboot_host": PermissionLevel.HOST_ADMIN,
"shutdown_host": PermissionLevel.HOST_ADMIN,
"configure_ntp": PermissionLevel.HOST_ADMIN,
"set_service_policy": PermissionLevel.HOST_ADMIN,
# ═══════════════════════════════════════════════════════════════════════
# FULL_ADMIN - Everything including guest OS and service control (10 tools)
# ═══════════════════════════════════════════════════════════════════════
"start_service": PermissionLevel.FULL_ADMIN,
"stop_service": PermissionLevel.FULL_ADMIN,
# Guest OS operations (requires guest credentials, high privilege)
"run_command_in_guest": PermissionLevel.FULL_ADMIN,
"list_guest_processes": PermissionLevel.FULL_ADMIN,
"read_guest_file": PermissionLevel.FULL_ADMIN,
"write_guest_file": PermissionLevel.FULL_ADMIN,
"list_guest_directory": PermissionLevel.FULL_ADMIN,
"create_guest_directory": PermissionLevel.FULL_ADMIN,
"delete_guest_file": PermissionLevel.FULL_ADMIN,
}
# OAuth Group → Granted Permission Levels
# Users inherit all permissions from their groups (union of all group permissions)
GROUP_PERMISSIONS: dict[str, set[PermissionLevel]] = {
# View-only access
"vsphere-readers": {
PermissionLevel.READ_ONLY,
},
# Operators can power on/off, manage snapshots
"vsphere-operators": {
PermissionLevel.READ_ONLY,
PermissionLevel.POWER_OPS,
},
# Admins can create/delete/modify VMs
"vsphere-admins": {
PermissionLevel.READ_ONLY,
PermissionLevel.POWER_OPS,
PermissionLevel.VM_LIFECYCLE,
},
# Host admins can manage ESXi hosts
"vsphere-host-admins": {
PermissionLevel.READ_ONLY,
PermissionLevel.POWER_OPS,
PermissionLevel.VM_LIFECYCLE,
PermissionLevel.HOST_ADMIN,
},
# Super admins have full access
"vsphere-super-admins": {
PermissionLevel.READ_ONLY,
PermissionLevel.POWER_OPS,
PermissionLevel.VM_LIFECYCLE,
PermissionLevel.HOST_ADMIN,
PermissionLevel.FULL_ADMIN,
},
}
class PermissionDeniedError(Exception):
"""Raised when user lacks permission for an operation."""
def __init__(self, username: str, tool_name: str, required: PermissionLevel):
self.username = username
self.tool_name = tool_name
self.required = required
super().__init__(
f"Permission denied: {username} lacks '{required.value}' permission for '{tool_name}'"
)
def get_user_permissions(groups: list[str] | None) -> set[PermissionLevel]:
"""Extract permissions from OAuth groups.
Args:
groups: List of OAuth group names from token claims.
Returns:
Set of granted permission levels (union of all group permissions).
"""
if not groups:
return {PermissionLevel.READ_ONLY}
permissions: set[PermissionLevel] = set()
for group in groups:
if group in GROUP_PERMISSIONS:
permissions.update(GROUP_PERMISSIONS[group])
# Default to read-only if no recognized groups
if not permissions:
permissions.add(PermissionLevel.READ_ONLY)
return permissions
def get_required_permission(tool_name: str) -> PermissionLevel:
"""Get required permission level for a tool.
Args:
tool_name: Name of the MCP tool.
Returns:
Required permission level (defaults to READ_ONLY if not mapped).
"""
return TOOL_PERMISSIONS.get(tool_name, PermissionLevel.READ_ONLY)
def check_permission(
tool_name: str,
groups: list[str] | None,
) -> bool:
"""Check if user has permission for a tool.
Args:
tool_name: Name of the MCP tool to check.
groups: OAuth groups from token claims.
Returns:
True if user has required permission, False otherwise.
"""
required = get_required_permission(tool_name)
user_perms = get_user_permissions(groups)
return required in user_perms

View File

@ -6,6 +6,7 @@ from pathlib import Path
from fastmcp import FastMCP
from mcvsphere.auth import create_auth_provider
from mcvsphere.config import Settings, get_settings
from mcvsphere.connection import VMwareConnection
from mcvsphere.mixins import (
@ -53,6 +54,9 @@ def create_server(settings: Settings | None = None) -> FastMCP:
stream=sys.stderr, # Explicitly use stderr
)
# Create auth provider if OAuth enabled
auth = create_auth_provider(settings)
# Create FastMCP server
mcp = FastMCP(
name="mcvsphere",
@ -61,6 +65,7 @@ def create_server(settings: Settings | None = None) -> FastMCP:
"Provides tools for VM lifecycle management, power operations, "
"snapshots, guest OS operations, monitoring, and infrastructure resources."
),
auth=auth,
)
# Create shared VMware connection
@ -114,8 +119,8 @@ def run_server(config_path: Path | None = None) -> None:
# Load settings
settings = Settings.from_yaml(config_path) if config_path else get_settings()
# Only print banner for SSE mode (stdio must stay clean for JSON-RPC)
if settings.mcp_transport == "sse":
# Only print banner for HTTP/SSE modes (stdio must stay clean for JSON-RPC)
if settings.mcp_transport in ("sse", "http"):
try:
from importlib.metadata import version
@ -125,15 +130,23 @@ def run_server(config_path: Path | None = None) -> None:
print(f"mcvsphere v{package_version}", file=sys.stderr)
print("" * 40, file=sys.stderr)
transport_name = "HTTP" if settings.mcp_transport == "http" else "SSE"
print(
f"Starting SSE transport on {settings.mcp_host}:{settings.mcp_port}",
f"Starting {transport_name} transport on {settings.mcp_host}:{settings.mcp_port}",
file=sys.stderr,
)
if settings.oauth_enabled:
print(f"OAuth: ENABLED via {settings.oauth_issuer_url}", file=sys.stderr)
else:
print("OAuth: disabled", file=sys.stderr)
print("" * 40, file=sys.stderr)
# Create and run server
mcp = create_server(settings)
if settings.mcp_transport == "sse":
if settings.mcp_transport == "http":
mcp.run(transport="streamable-http", host=settings.mcp_host, port=settings.mcp_port)
elif settings.mcp_transport == "sse":
mcp.run(transport="sse", host=settings.mcp_host, port=settings.mcp_port)
else:
# stdio mode - suppress banner to keep stdout clean for JSON-RPC

2
uv.lock generated
View File

@ -729,7 +729,7 @@ wheels = [
[[package]]
name = "mcvsphere"
version = "0.2.0"
version = "0.2.1"
source = { editable = "." }
dependencies = [
{ name = "fastmcp" },