Compare commits

...

3 Commits

Author SHA1 Message Date
112c1969c8 Fix port allocation to skip ports used by external Docker containers
Some checks failed
Build Ghidra Plugin / build (push) Has been cancelled
When port 8192 was already in use by a non-MCGhidra container (e.g.,
LTspice), docker_start would fail instead of trying the next port.
Now loops through the pool, checking each candidate against Docker's
published ports before using it.

Also includes Docker build retry improvements from earlier session.
2026-02-11 05:37:40 -07:00
57f042a802 Fix exception handling for functions_create and data_create
- Change from 'except Exception' to bare 'except' to catch Java
  exceptions from Ghidra that don't inherit from Python Exception
- Use sys.exc_info() to safely extract error messages when str(e)
  might fail on certain Java exception types
- Add null checks after getAddress() since it can return None
  instead of throwing for invalid addresses
- Add last-resort response handling to prevent silent connection
  drops when exception handling itself fails

These endpoints now return proper JSON error responses instead of
causing "Empty reply from server" errors.
2026-02-07 06:22:25 -07:00
842035ca92 Remove dead UI tools that can never work in headless MCP mode
ui_get_current_address and ui_get_current_function require Ghidra GUI
context to know what the user has selected. Since MCP always runs
headless (Docker container), these tools always fail with HEADLESS_MODE
error. Removed them to avoid confusion.

Alternative: Use explicit addresses with functions_get(address=...) or
data_list(addr=...) instead.
2026-02-07 06:01:30 -07:00
6 changed files with 95 additions and 67 deletions

View File

@ -25,8 +25,12 @@ RUN apt-get update && apt-get install -y --no-install-recommends \
# Download and extract Ghidra # Download and extract Ghidra
WORKDIR /opt WORKDIR /opt
RUN curl -fsSL "https://github.com/NationalSecurityAgency/ghidra/releases/download/Ghidra_${GHIDRA_VERSION}_build/ghidra_${GHIDRA_VERSION}_PUBLIC_${GHIDRA_DATE}.zip" \ # Download with retries and resume support for unreliable connections
-o ghidra.zip \ RUN for i in 1 2 3 4 5; do \
curl -fSL --http1.1 -C - \
"https://github.com/NationalSecurityAgency/ghidra/releases/download/Ghidra_${GHIDRA_VERSION}_build/ghidra_${GHIDRA_VERSION}_PUBLIC_${GHIDRA_DATE}.zip" \
-o ghidra.zip && break || sleep 30; \
done \
&& unzip -q ghidra.zip \ && unzip -q ghidra.zip \
&& rm ghidra.zip \ && rm ghidra.zip \
&& mv ghidra_${GHIDRA_VERSION}_PUBLIC ghidra && mv ghidra_${GHIDRA_VERSION}_PUBLIC ghidra
@ -89,8 +93,12 @@ RUN groupadd -g 1001 ghidra && useradd -u 1001 -g ghidra -m -s /bin/bash ghidra
# Download and extract Ghidra (in runtime stage for cleaner image) # Download and extract Ghidra (in runtime stage for cleaner image)
WORKDIR /opt WORKDIR /opt
RUN curl -fsSL "https://github.com/NationalSecurityAgency/ghidra/releases/download/Ghidra_${GHIDRA_VERSION}_build/ghidra_${GHIDRA_VERSION}_PUBLIC_${GHIDRA_DATE}.zip" \ # Download with retries and resume support for unreliable connections
-o ghidra.zip \ RUN for i in 1 2 3 4 5; do \
curl -fSL --http1.1 -C - \
"https://github.com/NationalSecurityAgency/ghidra/releases/download/Ghidra_${GHIDRA_VERSION}_build/ghidra_${GHIDRA_VERSION}_PUBLIC_${GHIDRA_DATE}.zip" \
-o ghidra.zip && break || sleep 30; \
done \
&& unzip -q ghidra.zip \ && unzip -q ghidra.zip \
&& rm ghidra.zip \ && rm ghidra.zip \
&& mv ghidra_${GHIDRA_VERSION}_PUBLIC ghidra \ && mv ghidra_${GHIDRA_VERSION}_PUBLIC ghidra \

View File

@ -412,14 +412,30 @@ class MCGhidraHandler(HttpHandler):
"success": False, "success": False,
"error": {"code": "NOT_FOUND", "message": "Endpoint not found: %s %s" % (method, path)} "error": {"code": "NOT_FOUND", "message": "Endpoint not found: %s %s" % (method, path)}
}) })
except Exception as e: except:
# Catch ALL exceptions including Java exceptions
import sys
exc_info = sys.exc_info()
try:
# Try to get a string representation safely
if exc_info[1] is not None:
msg = str(exc_info[1])
else:
msg = str(exc_info[0])
except:
msg = "Unknown exception"
try: try:
self._send_response(exchange, 500, { self._send_response(exchange, 500, {
"success": False, "success": False,
"error": {"code": "INTERNAL_ERROR", "message": str(e)} "error": {"code": "INTERNAL_ERROR", "message": msg}
}) })
except: except:
pass # Last resort - at least don't crash silently
try:
exchange.sendResponseHeaders(500, 0)
exchange.getResponseBody().close()
except:
pass
def _send_response(self, exchange, code, data): def _send_response(self, exchange, code, data):
response_bytes = json.dumps(data, indent=2).encode('utf-8') response_bytes = json.dumps(data, indent=2).encode('utf-8')
@ -992,6 +1008,9 @@ class MCGhidraHandler(HttpHandler):
except: except:
return {"success": False, "error": {"code": "INVALID_ADDRESS", "message": "Invalid address: %s" % addr_str}} return {"success": False, "error": {"code": "INVALID_ADDRESS", "message": "Invalid address: %s" % addr_str}}
if addr is None:
return {"success": False, "error": {"code": "INVALID_ADDRESS", "message": "Could not parse address: %s" % addr_str}}
result_holder = [None] result_holder = [None]
def do_create(): def do_create():
@ -1009,8 +1028,14 @@ class MCGhidraHandler(HttpHandler):
"message": "Function created successfully", "message": "Function created successfully",
}}, 201) }}, 201)
return {"success": False, "error": {"code": "CREATE_FAILED", "message": "Failed to create function at %s" % addr_str}} return {"success": False, "error": {"code": "CREATE_FAILED", "message": "Failed to create function at %s" % addr_str}}
except Exception as e: except:
return {"success": False, "error": {"code": "CREATE_ERROR", "message": str(e)}} import sys
exc = sys.exc_info()[1]
try:
msg = str(exc)
except:
msg = "Failed to create function"
return {"success": False, "error": {"code": "CREATE_ERROR", "message": msg}}
# -- Signature -- # -- Signature --
@ -1156,6 +1181,9 @@ class MCGhidraHandler(HttpHandler):
except: except:
return {"success": False, "error": {"code": "INVALID_ADDRESS", "message": "Invalid address: %s" % addr_str}} return {"success": False, "error": {"code": "INVALID_ADDRESS", "message": "Invalid address: %s" % addr_str}}
if addr is None:
return {"success": False, "error": {"code": "INVALID_ADDRESS", "message": "Could not parse address: %s" % addr_str}}
# Label creation (newName field) # Label creation (newName field)
new_name = body.get("newName") new_name = body.get("newName")
if new_name: if new_name:
@ -1164,8 +1192,14 @@ class MCGhidraHandler(HttpHandler):
try: try:
with_transaction(self.program, "Create label", do_label) with_transaction(self.program, "Create label", do_label)
return {"success": True, "result": {"address": addr_str, "name": new_name, "message": "Label created"}} return {"success": True, "result": {"address": addr_str, "name": new_name, "message": "Label created"}}
except Exception as e: except:
return {"success": False, "error": {"code": "LABEL_ERROR", "message": str(e)}} import sys
exc = sys.exc_info()[1]
try:
msg = str(exc)
except:
msg = "Failed to create label"
return {"success": False, "error": {"code": "LABEL_ERROR", "message": msg}}
# Data creation (type field) # Data creation (type field)
type_name = body.get("type") type_name = body.get("type")
@ -1187,8 +1221,14 @@ class MCGhidraHandler(HttpHandler):
try: try:
with_transaction(self.program, "Create data", do_create_data) with_transaction(self.program, "Create data", do_create_data)
return ({"success": True, "result": {"address": addr_str, "type": type_name, "message": "Data created"}}, 201) return ({"success": True, "result": {"address": addr_str, "type": type_name, "message": "Data created"}}, 201)
except Exception as e: except:
return {"success": False, "error": {"code": "DATA_ERROR", "message": str(e)}} import sys
exc = sys.exc_info()[1]
try:
msg = str(exc)
except:
msg = "Failed to create data"
return {"success": False, "error": {"code": "DATA_ERROR", "message": msg}}
def handle_data_delete(self, exchange): def handle_data_delete(self, exchange):
if not self.program: if not self.program:

View File

@ -1,6 +1,6 @@
[project] [project]
name = "mcghidra" name = "mcghidra"
version = "2025.12.3" version = "2026.2.11"
description = "AI-assisted reverse engineering bridge: a multi-instance Ghidra plugin exposed via a HATEOAS REST API plus an MCP Python bridge for decompilation, analysis & binary manipulation" description = "AI-assisted reverse engineering bridge: a multi-instance Ghidra plugin exposed via a HATEOAS REST API plus an MCP Python bridge for decompilation, analysis & binary manipulation"
readme = "README.md" readme = "README.md"
requires-python = ">=3.11" requires-python = ">=3.11"
@ -22,7 +22,7 @@ requires = ["hatchling"]
build-backend = "hatchling.build" build-backend = "hatchling.build"
[tool.hatch.build.targets.wheel] [tool.hatch.build.targets.wheel]
packages = ["src/mcghidra"] packages = ["mcghidra"]
[tool.hatch.build] [tool.hatch.build]
sources = ["src"] sources = ["src"]

View File

@ -242,41 +242,10 @@ class AnalysisMixin(MCGhidraMixinBase):
return paginated return paginated
@mcp_tool() # NOTE: ui_get_current_address and ui_get_current_function were removed
def ui_get_current_address(self, port: Optional[int] = None) -> Dict[str, Any]: # because they require Ghidra GUI context which is never available in
"""Get the address currently selected in Ghidra's UI. # headless MCP mode. Use functions_get(address=...) or data_list(addr=...)
# with explicit addresses instead.
Args:
port: Ghidra instance port (optional)
Returns:
Current address information
"""
try:
port = self.get_instance_port(port)
except ValueError as e:
return {"success": False, "error": {"code": "NO_INSTANCE", "message": str(e)}}
response = self.safe_get(port, "address")
return self.simplify_response(response)
@mcp_tool()
def ui_get_current_function(self, port: Optional[int] = None) -> Dict[str, Any]:
"""Get the function currently selected in Ghidra's UI.
Args:
port: Ghidra instance port (optional)
Returns:
Current function information
"""
try:
port = self.get_instance_port(port)
except ValueError as e:
return {"success": False, "error": {"code": "NO_INSTANCE", "message": str(e)}}
response = self.safe_get(port, "function")
return self.simplify_response(response)
@mcp_tool() @mcp_tool()
def comments_get( def comments_get(

View File

@ -581,14 +581,6 @@ class DockerMixin(MCGhidraMixinBase):
if not binary_file.exists(): if not binary_file.exists():
return {"error": f"Binary not found: {binary_path}"} return {"error": f"Binary not found: {binary_path}"}
# Always allocate from pool to prevent conflicts between sessions
port = self.port_pool.allocate(self.session_id)
if port is None:
return {
"error": "Port pool exhausted (8192-8223). Stop some containers first.",
"allocated_ports": self.port_pool.get_allocated_ports(),
}
# Generate container name if not specified # Generate container name if not specified
if name is None: if name is None:
name = self._generate_container_name(binary_file.name) name = self._generate_container_name(binary_file.name)
@ -602,19 +594,38 @@ class DockerMixin(MCGhidraMixinBase):
["ps", "-a", "-q", "-f", f"name=^{name}$"], check=False ["ps", "-a", "-q", "-f", f"name=^{name}$"], check=False
) )
if check_result.stdout.strip(): if check_result.stdout.strip():
self.port_pool.release(port)
return { return {
"error": f"Container '{name}' already exists. Stop it first with docker_stop." "error": f"Container '{name}' already exists. Stop it first with docker_stop."
} }
# Check if port is already in use by a non-pool container # Allocate a port that's both lockable AND not in use by Docker
port_check = await self._run_docker_cmd( # This handles external containers (not managed by MCGhidra) using ports in our range
["ps", "-q", "-f", f"publish={port}"], check=False port = None
) ports_tried = []
if port_check.stdout.strip(): for _ in range(PORT_POOL_END - PORT_POOL_START + 1):
self.port_pool.release(port) candidate_port = self.port_pool.allocate(self.session_id)
if candidate_port is None:
break # Pool exhausted
# Check if this port is already in use by a Docker container
port_check = await self._run_docker_cmd(
["ps", "-q", "-f", f"publish={candidate_port}"], check=False
)
if port_check.stdout.strip():
# Port is in use by Docker - release and try next
ports_tried.append(candidate_port)
self.port_pool.release(candidate_port)
continue
# Found a usable port!
port = candidate_port
break
if port is None:
return { return {
"error": f"Port {port} is already in use by another container" "error": "Port pool exhausted (8192-8223). All ports are in use by Docker containers.",
"ports_checked": ports_tried if ports_tried else "all ports locked by other MCGhidra sessions",
"allocated_ports": self.port_pool.get_allocated_ports(),
} }
# Build label arguments # Build label arguments

2
uv.lock generated
View File

@ -572,7 +572,7 @@ wheels = [
[[package]] [[package]]
name = "mcghidra" name = "mcghidra"
version = "2025.12.3" version = "2026.2.11"
source = { editable = "." } source = { editable = "." }
dependencies = [ dependencies = [
{ name = "fastmcp" }, { name = "fastmcp" },