"""Docker management mixin for GhydraMCP. Provides tools for managing Ghidra Docker containers programmatically. Allows the MCP server to automatically start containers when Ghidra isn't available. Supports dynamic container provisioning with port pooling to avoid contention when multiple Claude processes share the same MCP server. """ import asyncio import fcntl import json import os import shutil import subprocess import time import uuid from pathlib import Path from typing import Any, Dict, List, Optional from fastmcp import Context from fastmcp.contrib.mcp_mixin import MCPMixin, mcp_tool # Port pool configuration (32 ports should handle many concurrent sessions) PORT_POOL_START = 8192 PORT_POOL_END = 8223 PORT_LOCK_DIR = Path("/tmp/ghydramcp-ports") class PortPool: """Manages a pool of ports for GhydraMCP containers. Uses file-based locking to coordinate port allocation across multiple processes. Each allocated port gets a lock file that persists until the container is stopped. """ def __init__( self, start: int = PORT_POOL_START, end: int = PORT_POOL_END, lock_dir: Path = PORT_LOCK_DIR, ): self.start = start self.end = end self.lock_dir = lock_dir self.lock_dir.mkdir(parents=True, exist_ok=True) def _lock_file(self, port: int) -> Path: """Get the lock file path for a port.""" return self.lock_dir / f"port-{port}.lock" def _try_acquire_port(self, port: int, session_id: str) -> bool: """Try to acquire a specific port. Uses flock for cross-process synchronization. Args: port: Port number to acquire session_id: Session ID to associate with the port Returns: True if port was acquired, False if already in use """ lock_path = self._lock_file(port) try: # Open or create the lock file fd = os.open(str(lock_path), os.O_CREAT | os.O_RDWR, 0o644) # Try to get an exclusive lock (non-blocking) try: fcntl.flock(fd, fcntl.LOCK_EX | fcntl.LOCK_NB) except (IOError, OSError): # Port is locked by another process os.close(fd) return False # Write session info to the lock file os.ftruncate(fd, 0) os.lseek(fd, 0, os.SEEK_SET) lock_data = json.dumps({ "session_id": session_id, "pid": os.getpid(), "timestamp": time.time(), }) os.write(fd, lock_data.encode()) # Keep the file descriptor open to maintain the lock # Store it so we can release later if not hasattr(self, '_lock_fds'): self._lock_fds = {} self._lock_fds[port] = fd return True except Exception: return False def allocate(self, session_id: str) -> Optional[int]: """Allocate an available port from the pool. Args: session_id: Session ID to associate with the allocation Returns: Allocated port number, or None if pool exhausted """ for port in range(self.start, self.end + 1): if self._try_acquire_port(port, session_id): return port return None def release(self, port: int) -> bool: """Release a port back to the pool. Args: port: Port number to release Returns: True if released, False if not held """ if not hasattr(self, '_lock_fds') or port not in self._lock_fds: return False try: fd = self._lock_fds.pop(port) fcntl.flock(fd, fcntl.LOCK_UN) os.close(fd) # Remove the lock file lock_path = self._lock_file(port) if lock_path.exists(): lock_path.unlink() return True except Exception: return False def get_allocated_ports(self) -> Dict[int, Dict[str, Any]]: """Get info about all currently allocated ports. Returns: Dict mapping port numbers to their allocation info """ allocated = {} for port in range(self.start, self.end + 1): lock_path = self._lock_file(port) if lock_path.exists(): try: with open(lock_path, 'r') as f: data = json.load(f) allocated[port] = data except (json.JSONDecodeError, IOError): # Lock file exists but can't be read - port is likely in use allocated[port] = {"session_id": "unknown", "status": "locked"} return allocated def cleanup_stale_locks(self, max_age_seconds: float = 3600) -> List[int]: """Clean up stale lock files from crashed processes. Args: max_age_seconds: Max age for a lock file to be considered stale Returns: List of ports that were cleaned up """ cleaned = [] for port in range(self.start, self.end + 1): lock_path = self._lock_file(port) if not lock_path.exists(): continue try: # Check if the lock file is stale mtime = lock_path.stat().st_mtime age = time.time() - mtime if age > max_age_seconds: # Try to acquire the lock - if we can, the owning process is gone fd = os.open(str(lock_path), os.O_RDWR) try: fcntl.flock(fd, fcntl.LOCK_EX | fcntl.LOCK_NB) # We got the lock, so the original holder is gone os.close(fd) lock_path.unlink() cleaned.append(port) except (IOError, OSError): # Still locked by another process os.close(fd) except Exception: pass return cleaned class DockerMixin(MCPMixin): """Docker container management for GhydraMCP. Provides tools to start, stop, and manage Ghidra containers with the GhydraMCP plugin pre-installed. Supports multi-process environments with: - Dynamic port allocation from a pool (8192-8223) - Session-scoped container naming with UUIDs - Docker label-based tracking for cross-process visibility - Automatic cleanup of orphaned containers """ # Session ID for this MCP server instance _session_id: Optional[str] = None # Port pool for dynamic allocation _port_pool: Optional[PortPool] = None # Track containers started by this session _session_containers: Dict[str, Dict[str, Any]] = {} # Label prefix for GhydraMCP containers LABEL_PREFIX = "com.ghydramcp" def __init__(self): """Initialize Docker mixin with session isolation.""" self._check_docker_available() self._session_id = str(uuid.uuid4())[:8] self._port_pool = PortPool() self._session_containers = {} @property def session_id(self) -> str: """Get the session ID for this MCP instance.""" if self._session_id is None: self._session_id = str(uuid.uuid4())[:8] return self._session_id def _check_docker_available(self) -> bool: """Check if Docker is available on the system.""" return shutil.which("docker") is not None def _run_docker_cmd_sync( self, args: List[str], check: bool = True, capture: bool = True ) -> subprocess.CompletedProcess: """Run a docker command synchronously (internal use only). Args: args: Command arguments (after 'docker') check: Raise exception on non-zero exit capture: Capture stdout/stderr Returns: CompletedProcess result """ cmd = ["docker"] + args return subprocess.run( cmd, check=check, capture_output=capture, text=True, ) async def _run_docker_cmd( self, args: List[str], check: bool = True, capture: bool = True ) -> subprocess.CompletedProcess: """Run a docker command without blocking the event loop. Uses run_in_executor to run subprocess in thread pool. Args: args: Command arguments (after 'docker') check: Raise exception on non-zero exit capture: Capture stdout/stderr Returns: CompletedProcess result """ loop = asyncio.get_event_loop() return await loop.run_in_executor( None, self._run_docker_cmd_sync, args, check, capture ) def _run_compose_cmd( self, args: List[str], project_dir: Optional[Path] = None, check: bool = True, capture: bool = True, ) -> subprocess.CompletedProcess: """Run a docker compose command. Args: args: Command arguments (after 'docker compose') project_dir: Directory containing docker-compose.yml check: Raise exception on non-zero exit capture: Capture stdout/stderr Returns: CompletedProcess result """ cmd = ["docker", "compose"] # Use project directory if specified if project_dir: cmd.extend(["-f", str(project_dir / "docker-compose.yml")]) cmd.extend(args) env = os.environ.copy() if project_dir: env["COMPOSE_PROJECT_NAME"] = "ghydramcp" return subprocess.run( cmd, check=check, capture_output=capture, text=True, cwd=project_dir, env=env, ) def _generate_container_name(self, binary_name: str) -> str: """Generate a unique container name for this session. Format: ghydramcp-{session_id}-{binary_stem} Args: binary_name: Name of the binary being analyzed Returns: Unique container name """ # Clean binary name for container naming stem = Path(binary_name).stem.lower() clean_name = "".join(c if c.isalnum() else "-" for c in stem)[:20] return f"ghydramcp-{self.session_id}-{clean_name}" def _get_container_labels(self, binary_path: str, port: int) -> Dict[str, str]: """Generate Docker labels for a container. Labels are used for tracking and filtering containers across processes. Args: binary_path: Path to the binary being analyzed port: Port the container is using Returns: Dict of label key-value pairs """ return { f"{self.LABEL_PREFIX}.session": self.session_id, f"{self.LABEL_PREFIX}.port": str(port), f"{self.LABEL_PREFIX}.binary": str(binary_path), f"{self.LABEL_PREFIX}.started": str(int(time.time())), f"{self.LABEL_PREFIX}.pid": str(os.getpid()), } async def _find_containers_by_label( self, label_filter: Optional[str] = None, session_only: bool = False, ) -> List[Dict[str, Any]]: """Find GhydraMCP containers by label. Args: label_filter: Additional label filter (e.g., "port=8192") session_only: Only return containers from this session Returns: List of container info dicts """ containers = [] try: filter_args = ["--filter", f"label={self.LABEL_PREFIX}.session"] if session_only: filter_args = ["--filter", f"label={self.LABEL_PREFIX}.session={self.session_id}"] if label_filter: filter_args.extend(["--filter", f"label={self.LABEL_PREFIX}.{label_filter}"]) ps_result = await self._run_docker_cmd( [ "ps", "-a", *filter_args, "--format", "{{.ID}}\t{{.Names}}\t{{.Status}}\t{{.Ports}}\t{{.Label \"" + self.LABEL_PREFIX + ".session\"}}\t{{.Label \"" + self.LABEL_PREFIX + ".port\"}}\t{{.Label \"" + self.LABEL_PREFIX + ".binary\"}}", ], check=False, ) for line in ps_result.stdout.strip().split("\n"): if line: parts = line.split("\t") if len(parts) >= 4: containers.append({ "id": parts[0], "name": parts[1], "status": parts[2], "ports": parts[3], "session": parts[4] if len(parts) > 4 else "", "port": int(parts[5]) if len(parts) > 5 and parts[5].isdigit() else None, "binary": parts[6] if len(parts) > 6 else "", }) except subprocess.CalledProcessError: pass return containers @mcp_tool( name="docker_status", description="Check Docker availability and running GhydraMCP containers", ) async def docker_status(self, ctx: Optional[Context] = None) -> Dict[str, Any]: """Check Docker status and list running GhydraMCP containers. Returns: Status information including: - docker_available: Whether Docker is installed - docker_running: Whether Docker daemon is running - session_id: This MCP instance's session ID - containers: List of GhydraMCP containers with their status - port_pool: Port allocation status - images: Available GhydraMCP images """ result = { "docker_available": False, "docker_running": False, "session_id": self.session_id, "containers": [], "session_containers": [], "port_pool": { "range": f"{PORT_POOL_START}-{PORT_POOL_END}", "allocated": {}, }, "images": [], "compose_available": False, } # Check if docker is installed if not self._check_docker_available(): return result result["docker_available"] = True # Check if docker daemon is running try: await self._run_docker_cmd(["info"], check=True) result["docker_running"] = True except (subprocess.CalledProcessError, FileNotFoundError): return result # Check for docker compose try: await self._run_docker_cmd(["compose", "version"], check=True) result["compose_available"] = True except subprocess.CalledProcessError: pass # List all GhydraMCP containers (from any session) result["containers"] = await self._find_containers_by_label() # List containers from this session only result["session_containers"] = await self._find_containers_by_label(session_only=True) # Get port pool status if self._port_pool: result["port_pool"]["allocated"] = self._port_pool.get_allocated_ports() # Also check by name pattern for containers without labels try: ps_result = await self._run_docker_cmd( [ "ps", "-a", "--filter", "name=ghydramcp", "--format", "{{.ID}}\t{{.Names}}\t{{.Status}}\t{{.Ports}}", ] ) existing_ids = {c["id"] for c in result["containers"]} for line in ps_result.stdout.strip().split("\n"): if line: parts = line.split("\t") if len(parts) >= 3 and parts[0] not in existing_ids: result["containers"].append( { "id": parts[0], "name": parts[1], "status": parts[2], "ports": parts[3] if len(parts) > 3 else "", "session": "legacy", # No session label } ) except subprocess.CalledProcessError: pass # List GhydraMCP images try: images_result = await self._run_docker_cmd( [ "images", "--filter", "reference=ghydramcp*", "--format", "{{.Repository}}:{{.Tag}}\t{{.Size}}\t{{.CreatedSince}}", ] ) for line in images_result.stdout.strip().split("\n"): if line: parts = line.split("\t") if len(parts) >= 2: result["images"].append( { "name": parts[0], "size": parts[1], "created": parts[2] if len(parts) > 2 else "", } ) except subprocess.CalledProcessError: pass return result @mcp_tool( name="docker_start", description="Start a GhydraMCP Docker container to analyze a binary (auto-assigns port from pool)", ) async def docker_start( self, binary_path: str, memory: str = "2G", name: Optional[str] = None, ctx: Optional[Context] = None, ) -> Dict[str, Any]: """Start a GhydraMCP Docker container for binary analysis. This creates a new Ghidra instance in Docker with the GhydraMCP plugin pre-installed. The binary will be imported and analyzed, then the HTTP API will be available. Ports are automatically allocated from the pool (8192-8223) to prevent conflicts between concurrent sessions. Container names are auto-generated with the session ID to ensure uniqueness. Args: binary_path: Path to the binary file to analyze memory: Max JVM heap memory (default: 2G) name: Container name (auto-generated if not specified) Returns: Container info including ID, name, port, and API URL """ if not self._check_docker_available(): return {"error": "Docker is not available on this system"} # Verify binary exists binary_file = Path(binary_path).resolve() if not binary_file.exists(): return {"error": f"Binary not found: {binary_path}"} # Always allocate from pool to prevent conflicts between sessions port = self._port_pool.allocate(self.session_id) if port is None: return { "error": "Port pool exhausted (8192-8223). Stop some containers first.", "allocated_ports": self._port_pool.get_allocated_ports(), } # Generate container name if not specified if name is None: name = self._generate_container_name(binary_file.name) # Clean up invalid characters in container name name = "".join(c if c.isalnum() or c in "-_" else "-" for c in name) try: # Check if container with this name already exists check_result = await self._run_docker_cmd( ["ps", "-a", "-q", "-f", f"name=^{name}$"], check=False ) if check_result.stdout.strip(): self._port_pool.release(port) return { "error": f"Container '{name}' already exists. Stop it first with docker_stop." } # Check if port is already in use by a non-pool container port_check = await self._run_docker_cmd( ["ps", "-q", "-f", f"publish={port}"], check=False ) if port_check.stdout.strip(): self._port_pool.release(port) return { "error": f"Port {port} is already in use by another container" } # Build label arguments labels = self._get_container_labels(str(binary_file), port) label_args = [] for k, v in labels.items(): label_args.extend(["-l", f"{k}={v}"]) # Start the container run_result = await self._run_docker_cmd( [ "run", "-d", "--name", name, "-p", f"{port}:8192", "-v", f"{binary_file.parent}:/binaries:ro", "-e", f"GHYDRA_MAXMEM={memory}", *label_args, "ghydramcp:latest", f"/binaries/{binary_file.name}", ] ) container_id = run_result.stdout.strip() # Track the container in this session self._session_containers[container_id] = { "name": name, "port": port, "binary": str(binary_file), "memory": memory, } return { "success": True, "session_id": self.session_id, "container_id": container_id[:12], "name": name, "port": port, "api_url": f"http://localhost:{port}/", "binary": str(binary_file), "message": ( f"Container started on port {port}. Analysis in progress. " f"API will be available at http://localhost:{port}/ once analysis completes. " f"Use docker_logs('{name}') to monitor progress." ), } except subprocess.CalledProcessError as e: self._port_pool.release(port) return {"error": f"Failed to start container: {e.stderr or e.stdout}"} @mcp_tool( name="docker_stop", description="Stop a running GhydraMCP Docker container", ) async def docker_stop( self, name_or_id: str, remove: bool = True, ctx: Optional[Context] = None ) -> Dict[str, Any]: """Stop a GhydraMCP Docker container. For safety, this will only stop containers that belong to the current MCP session. Attempting to stop another session's container will fail with an error explaining whose container it is. Args: name_or_id: Container name or ID remove: Also remove the container (default: True) Returns: Status message """ if not self._check_docker_available(): return {"error": "Docker is not available on this system"} # Get container's session and port labels for validation container_port = None container_session = None try: inspect_result = await self._run_docker_cmd( [ "inspect", "--format", "{{index .Config.Labels \"" + self.LABEL_PREFIX + ".port\"}}|{{index .Config.Labels \"" + self.LABEL_PREFIX + ".session\"}}", name_or_id, ], check=False, ) parts = inspect_result.stdout.strip().split("|") if len(parts) >= 2: if parts[0].isdigit(): container_port = int(parts[0]) container_session = parts[1] if parts[1] else None except Exception: pass # Session validation: only allow stopping own containers if container_session and container_session != self.session_id: return { "error": f"Cannot stop container '{name_or_id}' - it belongs to session '{container_session}', not this session '{self.session_id}'.", "hint": "Each MCP session can only stop its own containers for safety.", } try: # Stop the container await self._run_docker_cmd(["stop", name_or_id]) if remove: await self._run_docker_cmd(["rm", name_or_id]) # Release the port back to the pool if container_port: self._port_pool.release(container_port) # Remove from session tracking self._session_containers = { k: v for k, v in self._session_containers.items() if not (k.startswith(name_or_id) or v.get("name") == name_or_id) } return { "success": True, "message": f"Container '{name_or_id}' stopped and removed", "port_released": container_port, } else: return {"success": True, "message": f"Container '{name_or_id}' stopped"} except subprocess.CalledProcessError as e: return {"error": f"Failed to stop container: {e.stderr or e.stdout}"} @mcp_tool( name="docker_logs", description="Get logs from a GhydraMCP Docker container", ) async def docker_logs( self, name_or_id: str, tail: int = 100, follow: bool = False, ctx: Optional[Context] = None, ) -> Dict[str, Any]: """Get logs from a GhydraMCP container. Args: name_or_id: Container name or ID tail: Number of lines to show (default: 100) follow: Whether to follow log output (not recommended for MCP) Returns: Container logs """ if not self._check_docker_available(): return {"error": "Docker is not available on this system"} try: args = ["logs", "--tail", str(tail)] if follow: args.append("-f") args.append(name_or_id) result = await self._run_docker_cmd(args) return { "success": True, "container": name_or_id, "logs": result.stdout + result.stderr, } except subprocess.CalledProcessError as e: return {"error": f"Failed to get logs: {e.stderr or e.stdout}"} @mcp_tool( name="docker_build", description="Build the GhydraMCP Docker image from source", ) async def docker_build( self, tag: str = "latest", no_cache: bool = False, project_dir: Optional[str] = None, ctx: Optional[Context] = None, ) -> Dict[str, Any]: """Build the GhydraMCP Docker image. Args: tag: Image tag (default: 'latest') no_cache: Build without using cache project_dir: Path to GhydraMCP project (auto-detected if not specified) Returns: Build status """ if not self._check_docker_available(): return {"error": "Docker is not available on this system"} # Find project directory if project_dir: proj_path = Path(project_dir) else: # Try to find docker/Dockerfile relative to this file module_dir = Path(__file__).parent.parent.parent.parent if (module_dir / "docker" / "Dockerfile").exists(): proj_path = module_dir else: return { "error": "Could not find GhydraMCP project directory. Please specify project_dir." } dockerfile = proj_path / "docker" / "Dockerfile" if not dockerfile.exists(): return {"error": f"Dockerfile not found at {dockerfile}"} try: args = [ "build", "-t", f"ghydramcp:{tag}", "-f", str(dockerfile), ] if no_cache: args.append("--no-cache") args.append(str(proj_path)) # Run build (this can take a while) result = await self._run_docker_cmd(args, capture=True) return { "success": True, "image": f"ghydramcp:{tag}", "message": f"Successfully built ghydramcp:{tag}", "output": result.stdout[-2000:] if len(result.stdout) > 2000 else result.stdout, } except subprocess.CalledProcessError as e: return {"error": f"Build failed: {e.stderr or e.stdout}"} def _sync_health_check(self, port: int, timeout: float) -> Dict[str, Any]: """Synchronous health check (runs in thread to avoid blocking event loop). Args: port: API port to check timeout: Request timeout in seconds Returns: Health status dict """ import json as json_module import urllib.error import urllib.request url = f"http://localhost:{port}/" try: req = urllib.request.Request(url) with urllib.request.urlopen(req, timeout=timeout) as response: data = json_module.loads(response.read().decode()) return { "healthy": True, "port": port, "api_version": data.get("api_version"), "program": data.get("program"), "file": data.get("file"), } except urllib.error.URLError as e: return { "healthy": False, "port": port, "error": str(e.reason), "message": "Container may still be starting or analyzing binary", } except Exception as e: return { "healthy": False, "port": port, "error": str(e), } @mcp_tool( name="docker_health", description="Check if a GhydraMCP container's API is responding", ) async def docker_health( self, port: int = 8192, timeout: float = 5.0, ctx: Optional[Context] = None ) -> Dict[str, Any]: """Check if a GhydraMCP container's API is healthy. Args: port: API port to check (default: 8192) timeout: Request timeout in seconds Returns: Health status and API info if available """ loop = asyncio.get_event_loop() return await loop.run_in_executor( None, self._sync_health_check, port, timeout ) @mcp_tool( name="docker_auto_start", description="Automatically start a GhydraMCP container with dynamic port allocation", ) async def docker_auto_start( self, binary_path: str, ctx: Optional[Context] = None, ) -> Dict[str, Any]: """Automatically start a Docker container with intelligent port allocation. This is the main entry point for automatic Docker management: 1. Checks if a Ghidra instance with the SAME binary is already running 2. If not, allocates a port from the pool and starts a new container 3. Returns connection info immediately Ports are auto-allocated from the pool (8192-8223) to prevent conflicts between concurrent sessions. After starting, poll docker_health(port) in a loop to check readiness. This gives you visibility into progress and ability to check logs. Args: binary_path: Path to the binary to analyze Returns: Instance connection info with session ID and port details. Poll docker_health(port) to check when container is ready. """ import os requested_name = os.path.basename(binary_path) def _is_same_binary(health_program: str) -> bool: """Check if a running instance has the same binary loaded.""" if not health_program: return False return os.path.basename(health_program) == requested_name # Check all pooled ports for an instance with the SAME binary for check_port in range(PORT_POOL_START, PORT_POOL_END + 1): health = await self.docker_health(port=check_port, timeout=1.0, ctx=ctx) if health.get("healthy") and _is_same_binary(health.get("program", "")): return { "source": "existing", "session_id": self.session_id, "port": check_port, "api_url": f"http://localhost:{check_port}/", "program": health.get("program"), "message": f"Found existing Ghidra instance on port {check_port}", } # Check if Docker is available status = await self.docker_status(ctx=ctx) if not status.get("docker_running"): return { "error": "Docker is not available. Please install Docker or start Ghidra manually." } # Check if we have the image if not any("ghydramcp" in img.get("name", "") for img in status.get("images", [])): return { "error": ( "GhydraMCP Docker image not found. " "Build it with docker_build() or 'make build' first." ) } # Start a new container (port auto-allocated from pool) start_result = await self.docker_start( binary_path=binary_path, ctx=ctx ) if not start_result.get("success"): return start_result actual_port = start_result.get("port") return { "source": "docker", "session_id": self.session_id, "container_id": start_result.get("container_id"), "container_name": start_result.get("name"), "port": actual_port, "api_url": f"http://localhost:{actual_port}/", "message": f"Container starting on port {actual_port}. Poll docker_health(port={actual_port}) to check when ready.", } @mcp_tool( name="docker_cleanup", description="Clean up orphaned containers and stale port locks", ) async def docker_cleanup( self, session_only: bool = True, max_age_hours: float = 24.0, dry_run: bool = False, ctx: Optional[Context] = None, ) -> Dict[str, Any]: """Clean up orphaned GhydraMCP containers and stale port locks. This helps recover from crashed processes that left containers or port locks behind. By default, only cleans containers from the current session to prevent accidentally removing another agent's work. Set session_only=False (with caution) to clean all GhydraMCP containers. Args: session_only: Only clean up containers from this session (default: True for safety) max_age_hours: Max age for orphaned containers (default: 24 hours) dry_run: If True, only report what would be cleaned up Returns: Cleanup report with containers and ports cleaned """ if not self._check_docker_available(): return {"error": "Docker is not available on this system"} result = { "dry_run": dry_run, "containers_cleaned": [], "ports_cleaned": [], "errors": [], } # Find orphaned containers containers = await self._find_containers_by_label(session_only=session_only) for container in containers: # Check if container is old enough to be considered orphaned try: inspect_result = await self._run_docker_cmd( ["inspect", "--format", "{{index .Config.Labels \"" + self.LABEL_PREFIX + ".started\"}}", container["id"]], check=False, ) started_ts = inspect_result.stdout.strip() if started_ts.isdigit(): age_hours = (time.time() - int(started_ts)) / 3600 if age_hours > max_age_hours: if dry_run: result["containers_cleaned"].append({ "id": container["id"], "name": container["name"], "age_hours": round(age_hours, 1), "would_remove": True, }) else: try: await self.docker_stop(container["id"], remove=True, ctx=ctx) result["containers_cleaned"].append({ "id": container["id"], "name": container["name"], "age_hours": round(age_hours, 1), "removed": True, }) except Exception as e: result["errors"].append(f"Failed to remove {container['id']}: {e}") except Exception: pass # Clean up stale port locks if self._port_pool: stale_ports = self._port_pool.cleanup_stale_locks(max_age_hours * 3600) result["ports_cleaned"] = stale_ports return result @mcp_tool( name="docker_session_info", description="Get information about this MCP session's containers", ) async def docker_session_info( self, ctx: Optional[Context] = None ) -> Dict[str, Any]: """Get information about containers and ports for this MCP session. Returns: Session info including: - session_id: This session's unique identifier - containers: Containers started by this session - allocated_ports: Ports allocated to this session """ return { "session_id": self.session_id, "containers": self._session_containers, "allocated_ports": { port: info for port, info in self._port_pool.get_allocated_ports().items() if info.get("session_id") == self.session_id } if self._port_pool else {}, "port_pool_range": f"{PORT_POOL_START}-{PORT_POOL_END}", }