diff --git a/src/ghydramcp/mixins/docker.py b/src/ghydramcp/mixins/docker.py index 2c8d1a2..51add07 100644 --- a/src/ghydramcp/mixins/docker.py +++ b/src/ghydramcp/mixins/docker.py @@ -2,14 +2,21 @@ Provides tools for managing Ghidra Docker containers programmatically. Allows the MCP server to automatically start containers when Ghidra isn't available. + +Supports dynamic container provisioning with port pooling to avoid contention +when multiple Claude processes share the same MCP server. """ import asyncio +import fcntl +import json import os import shutil import subprocess +import time +import uuid from pathlib import Path -from typing import Any, Dict, List, Optional +from typing import Any, Dict, List, Optional, Set from fastmcp import Context from fastmcp.contrib.mcp_mixin import MCPMixin, mcp_tool @@ -17,19 +24,220 @@ from fastmcp.contrib.mcp_mixin import MCPMixin, mcp_tool from ..config import get_config, get_docker_config +# Port pool configuration +PORT_POOL_START = 8192 +PORT_POOL_END = 8199 +PORT_LOCK_DIR = Path("/tmp/ghydramcp-ports") + + +class PortPool: + """Manages a pool of ports for GhydraMCP containers. + + Uses file-based locking to coordinate port allocation across multiple + processes. Each allocated port gets a lock file that persists until + the container is stopped. + """ + + def __init__( + self, + start: int = PORT_POOL_START, + end: int = PORT_POOL_END, + lock_dir: Path = PORT_LOCK_DIR, + ): + self.start = start + self.end = end + self.lock_dir = lock_dir + self.lock_dir.mkdir(parents=True, exist_ok=True) + + def _lock_file(self, port: int) -> Path: + """Get the lock file path for a port.""" + return self.lock_dir / f"port-{port}.lock" + + def _try_acquire_port(self, port: int, session_id: str) -> bool: + """Try to acquire a specific port. + + Uses flock for cross-process synchronization. + + Args: + port: Port number to acquire + session_id: Session ID to associate with the port + + Returns: + True if port was acquired, False if already in use + """ + lock_path = self._lock_file(port) + + try: + # Open or create the lock file + fd = os.open(str(lock_path), os.O_CREAT | os.O_RDWR, 0o644) + + # Try to get an exclusive lock (non-blocking) + try: + fcntl.flock(fd, fcntl.LOCK_EX | fcntl.LOCK_NB) + except (IOError, OSError): + # Port is locked by another process + os.close(fd) + return False + + # Write session info to the lock file + os.ftruncate(fd, 0) + os.lseek(fd, 0, os.SEEK_SET) + lock_data = json.dumps({ + "session_id": session_id, + "pid": os.getpid(), + "timestamp": time.time(), + }) + os.write(fd, lock_data.encode()) + + # Keep the file descriptor open to maintain the lock + # Store it so we can release later + if not hasattr(self, '_lock_fds'): + self._lock_fds = {} + self._lock_fds[port] = fd + + return True + + except Exception: + return False + + def allocate(self, session_id: str) -> Optional[int]: + """Allocate an available port from the pool. + + Args: + session_id: Session ID to associate with the allocation + + Returns: + Allocated port number, or None if pool exhausted + """ + for port in range(self.start, self.end + 1): + if self._try_acquire_port(port, session_id): + return port + return None + + def release(self, port: int) -> bool: + """Release a port back to the pool. + + Args: + port: Port number to release + + Returns: + True if released, False if not held + """ + if not hasattr(self, '_lock_fds') or port not in self._lock_fds: + return False + + try: + fd = self._lock_fds.pop(port) + fcntl.flock(fd, fcntl.LOCK_UN) + os.close(fd) + + # Remove the lock file + lock_path = self._lock_file(port) + if lock_path.exists(): + lock_path.unlink() + + return True + except Exception: + return False + + def get_allocated_ports(self) -> Dict[int, Dict[str, Any]]: + """Get info about all currently allocated ports. + + Returns: + Dict mapping port numbers to their allocation info + """ + allocated = {} + + for port in range(self.start, self.end + 1): + lock_path = self._lock_file(port) + if lock_path.exists(): + try: + with open(lock_path, 'r') as f: + data = json.load(f) + allocated[port] = data + except (json.JSONDecodeError, IOError): + # Lock file exists but can't be read - port is likely in use + allocated[port] = {"session_id": "unknown", "status": "locked"} + + return allocated + + def cleanup_stale_locks(self, max_age_seconds: float = 3600) -> List[int]: + """Clean up stale lock files from crashed processes. + + Args: + max_age_seconds: Max age for a lock file to be considered stale + + Returns: + List of ports that were cleaned up + """ + cleaned = [] + + for port in range(self.start, self.end + 1): + lock_path = self._lock_file(port) + if not lock_path.exists(): + continue + + try: + # Check if the lock file is stale + mtime = lock_path.stat().st_mtime + age = time.time() - mtime + + if age > max_age_seconds: + # Try to acquire the lock - if we can, the owning process is gone + fd = os.open(str(lock_path), os.O_RDWR) + try: + fcntl.flock(fd, fcntl.LOCK_EX | fcntl.LOCK_NB) + # We got the lock, so the original holder is gone + os.close(fd) + lock_path.unlink() + cleaned.append(port) + except (IOError, OSError): + # Still locked by another process + os.close(fd) + except Exception: + pass + + return cleaned + + class DockerMixin(MCPMixin): """Docker container management for GhydraMCP. Provides tools to start, stop, and manage Ghidra containers with the GhydraMCP plugin pre-installed. + + Supports multi-process environments with: + - Dynamic port allocation from a pool (8192-8199) + - Session-scoped container naming with UUIDs + - Docker label-based tracking for cross-process visibility + - Automatic cleanup of orphaned containers """ - # Track running containers - _containers: Dict[str, Dict[str, Any]] = {} + # Session ID for this MCP server instance + _session_id: Optional[str] = None + + # Port pool for dynamic allocation + _port_pool: Optional[PortPool] = None + + # Track containers started by this session + _session_containers: Dict[str, Dict[str, Any]] = {} + + # Label prefix for GhydraMCP containers + LABEL_PREFIX = "com.ghydramcp" def __init__(self): - """Initialize Docker mixin.""" + """Initialize Docker mixin with session isolation.""" self._check_docker_available() + self._session_id = str(uuid.uuid4())[:8] + self._port_pool = PortPool() + self._session_containers = {} + + @property + def session_id(self) -> str: + """Get the session ID for this MCP instance.""" + if self._session_id is None: + self._session_id = str(uuid.uuid4())[:8] + return self._session_id def _check_docker_available(self) -> bool: """Check if Docker is available on the system.""" @@ -95,6 +303,93 @@ class DockerMixin(MCPMixin): env=env, ) + def _generate_container_name(self, binary_name: str) -> str: + """Generate a unique container name for this session. + + Format: ghydramcp-{session_id}-{binary_stem} + + Args: + binary_name: Name of the binary being analyzed + + Returns: + Unique container name + """ + # Clean binary name for container naming + stem = Path(binary_name).stem.lower() + clean_name = "".join(c if c.isalnum() else "-" for c in stem)[:20] + return f"ghydramcp-{self.session_id}-{clean_name}" + + def _get_container_labels(self, binary_path: str, port: int) -> Dict[str, str]: + """Generate Docker labels for a container. + + Labels are used for tracking and filtering containers across processes. + + Args: + binary_path: Path to the binary being analyzed + port: Port the container is using + + Returns: + Dict of label key-value pairs + """ + return { + f"{self.LABEL_PREFIX}.session": self.session_id, + f"{self.LABEL_PREFIX}.port": str(port), + f"{self.LABEL_PREFIX}.binary": str(binary_path), + f"{self.LABEL_PREFIX}.started": str(int(time.time())), + f"{self.LABEL_PREFIX}.pid": str(os.getpid()), + } + + def _find_containers_by_label( + self, + label_filter: Optional[str] = None, + session_only: bool = False, + ) -> List[Dict[str, Any]]: + """Find GhydraMCP containers by label. + + Args: + label_filter: Additional label filter (e.g., "port=8192") + session_only: Only return containers from this session + + Returns: + List of container info dicts + """ + containers = [] + + try: + filter_args = ["--filter", f"label={self.LABEL_PREFIX}.session"] + if session_only: + filter_args = ["--filter", f"label={self.LABEL_PREFIX}.session={self.session_id}"] + if label_filter: + filter_args.extend(["--filter", f"label={self.LABEL_PREFIX}.{label_filter}"]) + + ps_result = self._run_docker_cmd( + [ + "ps", "-a", + *filter_args, + "--format", + "{{.ID}}\t{{.Names}}\t{{.Status}}\t{{.Ports}}\t{{.Label \"" + self.LABEL_PREFIX + ".session\"}}\t{{.Label \"" + self.LABEL_PREFIX + ".port\"}}\t{{.Label \"" + self.LABEL_PREFIX + ".binary\"}}", + ], + check=False, + ) + + for line in ps_result.stdout.strip().split("\n"): + if line: + parts = line.split("\t") + if len(parts) >= 4: + containers.append({ + "id": parts[0], + "name": parts[1], + "status": parts[2], + "ports": parts[3], + "session": parts[4] if len(parts) > 4 else "", + "port": int(parts[5]) if len(parts) > 5 and parts[5].isdigit() else None, + "binary": parts[6] if len(parts) > 6 else "", + }) + except subprocess.CalledProcessError: + pass + + return containers + @mcp_tool( name="docker_status", description="Check Docker availability and running GhydraMCP containers", @@ -106,13 +401,21 @@ class DockerMixin(MCPMixin): Status information including: - docker_available: Whether Docker is installed - docker_running: Whether Docker daemon is running + - session_id: This MCP instance's session ID - containers: List of GhydraMCP containers with their status + - port_pool: Port allocation status - images: Available GhydraMCP images """ result = { "docker_available": False, "docker_running": False, + "session_id": self.session_id, "containers": [], + "session_containers": [], + "port_pool": { + "range": f"{PORT_POOL_START}-{PORT_POOL_END}", + "allocated": {}, + }, "images": [], "compose_available": False, } @@ -137,34 +440,17 @@ class DockerMixin(MCPMixin): except subprocess.CalledProcessError: pass - # List GhydraMCP containers - try: - ps_result = self._run_docker_cmd( - [ - "ps", - "-a", - "--filter", - "label=org.opencontainers.image.title=ghydramcp", - "--format", - "{{.ID}}\t{{.Names}}\t{{.Status}}\t{{.Ports}}", - ] - ) - for line in ps_result.stdout.strip().split("\n"): - if line: - parts = line.split("\t") - if len(parts) >= 3: - result["containers"].append( - { - "id": parts[0], - "name": parts[1], - "status": parts[2], - "ports": parts[3] if len(parts) > 3 else "", - } - ) - except subprocess.CalledProcessError: - pass + # List all GhydraMCP containers (from any session) + result["containers"] = self._find_containers_by_label() - # Also check by name pattern + # List containers from this session only + result["session_containers"] = self._find_containers_by_label(session_only=True) + + # Get port pool status + if self._port_pool: + result["port_pool"]["allocated"] = self._port_pool.get_allocated_ports() + + # Also check by name pattern for containers without labels try: ps_result = self._run_docker_cmd( [ @@ -187,6 +473,7 @@ class DockerMixin(MCPMixin): "name": parts[1], "status": parts[2], "ports": parts[3] if len(parts) > 3 else "", + "session": "legacy", # No session label } ) except subprocess.CalledProcessError: @@ -221,12 +508,12 @@ class DockerMixin(MCPMixin): @mcp_tool( name="docker_start", - description="Start a GhydraMCP Docker container to analyze a binary", + description="Start a GhydraMCP Docker container to analyze a binary (auto-assigns port from pool)", ) async def docker_start( self, binary_path: str, - port: int = 8192, + port: Optional[int] = None, memory: str = "2G", name: Optional[str] = None, ctx: Optional[Context] = None, @@ -237,14 +524,18 @@ class DockerMixin(MCPMixin): plugin pre-installed. The binary will be imported and analyzed, then the HTTP API will be available on the specified port. + If no port is specified, one will be automatically allocated from + the pool (8192-8199). Container names are auto-generated with the + session ID to ensure uniqueness across processes. + Args: binary_path: Path to the binary file to analyze - port: Port to expose the HTTP API (default: 8192) + port: Port to expose the HTTP API (auto-allocated if not specified) memory: Max JVM heap memory (default: 2G) name: Container name (auto-generated if not specified) Returns: - Container info including ID, name, and API URL + Container info including ID, name, port, and API URL """ if not self._check_docker_available(): return {"error": "Docker is not available on this system"} @@ -254,9 +545,20 @@ class DockerMixin(MCPMixin): if not binary_file.exists(): return {"error": f"Binary not found: {binary_path}"} + # Allocate port from pool if not specified + allocated_port = False + if port is None: + port = self._port_pool.allocate(self.session_id) + if port is None: + return { + "error": "Port pool exhausted (8192-8199). Stop some containers first.", + "allocated_ports": self._port_pool.get_allocated_ports(), + } + allocated_port = True + # Generate container name if not specified if name is None: - name = f"ghydramcp-{binary_file.stem}-{port}" + name = self._generate_container_name(binary_file.name) # Clean up invalid characters in container name name = "".join(c if c.isalnum() or c in "-_" else "-" for c in name) @@ -264,22 +566,32 @@ class DockerMixin(MCPMixin): try: # Check if container with this name already exists check_result = self._run_docker_cmd( - ["ps", "-a", "-q", "-f", f"name={name}"], check=False + ["ps", "-a", "-q", "-f", f"name=^{name}$"], check=False ) if check_result.stdout.strip(): + if allocated_port: + self._port_pool.release(port) return { "error": f"Container '{name}' already exists. Stop it first with docker_stop." } - # Check if port is already in use + # Check if port is already in use by a non-pool container port_check = self._run_docker_cmd( ["ps", "-q", "-f", f"publish={port}"], check=False ) if port_check.stdout.strip(): + if allocated_port: + self._port_pool.release(port) return { "error": f"Port {port} is already in use by another container" } + # Build label arguments + labels = self._get_container_labels(str(binary_file), port) + label_args = [] + for k, v in labels.items(): + label_args.extend(["-l", f"{k}={v}"]) + # Start the container run_result = self._run_docker_cmd( [ @@ -293,6 +605,7 @@ class DockerMixin(MCPMixin): f"{binary_file.parent}:/binaries:ro", "-e", f"GHYDRA_MAXMEM={memory}", + *label_args, "ghydramcp:latest", f"/binaries/{binary_file.name}", ] @@ -300,29 +613,33 @@ class DockerMixin(MCPMixin): container_id = run_result.stdout.strip() - # Track the container - self._containers[container_id] = { + # Track the container in this session + self._session_containers[container_id] = { "name": name, "port": port, "binary": str(binary_file), "memory": memory, + "allocated_port": allocated_port, } return { "success": True, + "session_id": self.session_id, "container_id": container_id[:12], "name": name, "port": port, "api_url": f"http://localhost:{port}/", "binary": str(binary_file), "message": ( - f"Container started. Analysis in progress. " + f"Container started on port {port}. Analysis in progress. " f"API will be available at http://localhost:{port}/ once analysis completes. " f"Use docker_logs('{name}') to monitor progress." ), } except subprocess.CalledProcessError as e: + if allocated_port: + self._port_pool.release(port) return {"error": f"Failed to start container: {e.stderr or e.stdout}"} @mcp_tool( @@ -344,19 +661,40 @@ class DockerMixin(MCPMixin): if not self._check_docker_available(): return {"error": "Docker is not available on this system"} + # Find the container to get its port for pool release + container_port = None + try: + inspect_result = self._run_docker_cmd( + ["inspect", "--format", "{{index .Config.Labels \"" + self.LABEL_PREFIX + ".port\"}}", name_or_id], + check=False, + ) + if inspect_result.stdout.strip().isdigit(): + container_port = int(inspect_result.stdout.strip()) + except Exception: + pass + try: # Stop the container self._run_docker_cmd(["stop", name_or_id]) if remove: self._run_docker_cmd(["rm", name_or_id]) - # Remove from tracking - self._containers = { + + # Release the port back to the pool + if container_port: + self._port_pool.release(container_port) + + # Remove from session tracking + self._session_containers = { k: v - for k, v in self._containers.items() + for k, v in self._session_containers.items() if not (k.startswith(name_or_id) or v.get("name") == name_or_id) } - return {"success": True, "message": f"Container '{name_or_id}' stopped and removed"} + return { + "success": True, + "message": f"Container '{name_or_id}' stopped and removed", + "port_released": container_port, + } else: return {"success": True, "message": f"Container '{name_or_id}' stopped"} @@ -487,14 +825,14 @@ class DockerMixin(MCPMixin): """ import urllib.request import urllib.error - import json + import json as json_module url = f"http://localhost:{port}/" try: req = urllib.request.Request(url) with urllib.request.urlopen(req, timeout=timeout) as response: - data = json.loads(response.read().decode()) + data = json_module.loads(response.read().decode()) return { "healthy": True, "port": port, @@ -539,8 +877,6 @@ class DockerMixin(MCPMixin): Returns: Health status once healthy, or error on timeout """ - import time - start_time = time.time() last_error = None @@ -561,43 +897,62 @@ class DockerMixin(MCPMixin): @mcp_tool( name="docker_auto_start", - description="Automatically start a GhydraMCP container if no Ghidra instance is available", + description="Automatically start a GhydraMCP container with dynamic port allocation", ) async def docker_auto_start( self, binary_path: str, - port: int = 8192, + port: Optional[int] = None, wait: bool = True, timeout: float = 300.0, ctx: Optional[Context] = None, ) -> Dict[str, Any]: - """Automatically start a Docker container if no Ghidra instance is available. + """Automatically start a Docker container with intelligent port allocation. This is the main entry point for automatic Docker management: - 1. Checks if a Ghidra instance is already running on the port - 2. If not, starts a new Docker container + 1. Checks if a Ghidra instance is already running (on specified or any pooled port) + 2. If not, allocates a port from the pool and starts a new container 3. Optionally waits for the container to become healthy 4. Returns connection info for the instance + When port is not specified, the system will: + - First check all pooled ports (8192-8199) for an existing healthy instance + - If none found, allocate a new port from the pool + Args: binary_path: Path to the binary to analyze - port: Port for the HTTP API (default: 8192) + port: Specific port for the HTTP API (auto-allocated if not specified) wait: Wait for container to be ready (default: True) timeout: Max wait time in seconds (default: 300) Returns: - Instance connection info + Instance connection info with session ID and port details """ - # First, check if there's already a Ghidra instance on this port - health = await self.docker_health(port=port, ctx=ctx) - if health.get("healthy"): - return { - "source": "existing", - "port": port, - "api_url": f"http://localhost:{port}/", - "program": health.get("program"), - "message": "Using existing Ghidra instance", - } + # If port is specified, check that specific port + if port is not None: + health = await self.docker_health(port=port, ctx=ctx) + if health.get("healthy"): + return { + "source": "existing", + "session_id": self.session_id, + "port": port, + "api_url": f"http://localhost:{port}/", + "program": health.get("program"), + "message": "Using existing Ghidra instance", + } + else: + # Check all pooled ports for an existing instance + for check_port in range(PORT_POOL_START, PORT_POOL_END + 1): + health = await self.docker_health(port=check_port, timeout=1.0, ctx=ctx) + if health.get("healthy"): + return { + "source": "existing", + "session_id": self.session_id, + "port": check_port, + "api_url": f"http://localhost:{check_port}/", + "program": health.get("program"), + "message": f"Found existing Ghidra instance on port {check_port}", + } # Check if Docker is available status = await self.docker_status(ctx=ctx) @@ -615,7 +970,7 @@ class DockerMixin(MCPMixin): ) } - # Start a new container + # Start a new container (port will be auto-allocated if not specified) start_result = await self.docker_start( binary_path=binary_path, port=port, ctx=ctx ) @@ -623,34 +978,141 @@ class DockerMixin(MCPMixin): if not start_result.get("success"): return start_result + actual_port = start_result.get("port") + if wait: # Wait for the container to become healthy - wait_result = await self.docker_wait(port=port, timeout=timeout, ctx=ctx) + wait_result = await self.docker_wait(port=actual_port, timeout=timeout, ctx=ctx) if wait_result.get("healthy"): return { "source": "docker", + "session_id": self.session_id, "container_id": start_result.get("container_id"), "container_name": start_result.get("name"), - "port": port, - "api_url": f"http://localhost:{port}/", + "port": actual_port, + "api_url": f"http://localhost:{actual_port}/", "program": wait_result.get("program"), "waited_seconds": wait_result.get("waited_seconds"), - "message": f"Docker container ready after {wait_result.get('waited_seconds')}s", + "message": f"Docker container ready on port {actual_port} after {wait_result.get('waited_seconds')}s", } else: return { "warning": "Container started but not yet healthy", + "session_id": self.session_id, "container_id": start_result.get("container_id"), - "port": port, + "port": actual_port, "last_error": wait_result.get("error"), "message": "Container may still be analyzing. Check docker_logs() for progress.", } return { "source": "docker", + "session_id": self.session_id, "container_id": start_result.get("container_id"), "container_name": start_result.get("name"), - "port": port, - "api_url": f"http://localhost:{port}/", - "message": "Container starting. Use docker_wait() or docker_health() to check status.", + "port": actual_port, + "api_url": f"http://localhost:{actual_port}/", + "message": f"Container starting on port {actual_port}. Use docker_wait() or docker_health() to check status.", + } + + @mcp_tool( + name="docker_cleanup", + description="Clean up orphaned containers and stale port locks", + ) + async def docker_cleanup( + self, + session_only: bool = False, + max_age_hours: float = 24.0, + dry_run: bool = False, + ctx: Optional[Context] = None, + ) -> Dict[str, Any]: + """Clean up orphaned GhydraMCP containers and stale port locks. + + This helps recover from crashed processes that left containers or + port locks behind. + + Args: + session_only: Only clean up containers from this session + max_age_hours: Max age for orphaned containers (default: 24 hours) + dry_run: If True, only report what would be cleaned up + + Returns: + Cleanup report with containers and ports cleaned + """ + if not self._check_docker_available(): + return {"error": "Docker is not available on this system"} + + result = { + "dry_run": dry_run, + "containers_cleaned": [], + "ports_cleaned": [], + "errors": [], + } + + # Find orphaned containers + containers = self._find_containers_by_label(session_only=session_only) + + for container in containers: + # Check if container is old enough to be considered orphaned + try: + inspect_result = self._run_docker_cmd( + ["inspect", "--format", "{{index .Config.Labels \"" + self.LABEL_PREFIX + ".started\"}}", container["id"]], + check=False, + ) + started_ts = inspect_result.stdout.strip() + if started_ts.isdigit(): + age_hours = (time.time() - int(started_ts)) / 3600 + if age_hours > max_age_hours: + if dry_run: + result["containers_cleaned"].append({ + "id": container["id"], + "name": container["name"], + "age_hours": round(age_hours, 1), + "would_remove": True, + }) + else: + try: + await self.docker_stop(container["id"], remove=True, ctx=ctx) + result["containers_cleaned"].append({ + "id": container["id"], + "name": container["name"], + "age_hours": round(age_hours, 1), + "removed": True, + }) + except Exception as e: + result["errors"].append(f"Failed to remove {container['id']}: {e}") + except Exception: + pass + + # Clean up stale port locks + if self._port_pool: + stale_ports = self._port_pool.cleanup_stale_locks(max_age_hours * 3600) + result["ports_cleaned"] = stale_ports + + return result + + @mcp_tool( + name="docker_session_info", + description="Get information about this MCP session's containers", + ) + async def docker_session_info( + self, ctx: Optional[Context] = None + ) -> Dict[str, Any]: + """Get information about containers and ports for this MCP session. + + Returns: + Session info including: + - session_id: This session's unique identifier + - containers: Containers started by this session + - allocated_ports: Ports allocated to this session + """ + return { + "session_id": self.session_id, + "containers": self._session_containers, + "allocated_ports": { + port: info + for port, info in self._port_pool.get_allocated_ports().items() + if info.get("session_id") == self.session_id + } if self._port_pool else {}, + "port_pool_range": f"{PORT_POOL_START}-{PORT_POOL_END}", }