Compare commits

..

No commits in common. "112c1969c805b88b5b3071d5471e3331cca258a8" and "c930e7c05985bab625a49df707e97a5c05e0c631" have entirely different histories.

6 changed files with 67 additions and 95 deletions

View File

@ -25,12 +25,8 @@ RUN apt-get update && apt-get install -y --no-install-recommends \
# Download and extract Ghidra
WORKDIR /opt
# Download with retries and resume support for unreliable connections
RUN for i in 1 2 3 4 5; do \
curl -fSL --http1.1 -C - \
"https://github.com/NationalSecurityAgency/ghidra/releases/download/Ghidra_${GHIDRA_VERSION}_build/ghidra_${GHIDRA_VERSION}_PUBLIC_${GHIDRA_DATE}.zip" \
-o ghidra.zip && break || sleep 30; \
done \
RUN curl -fsSL "https://github.com/NationalSecurityAgency/ghidra/releases/download/Ghidra_${GHIDRA_VERSION}_build/ghidra_${GHIDRA_VERSION}_PUBLIC_${GHIDRA_DATE}.zip" \
-o ghidra.zip \
&& unzip -q ghidra.zip \
&& rm ghidra.zip \
&& mv ghidra_${GHIDRA_VERSION}_PUBLIC ghidra
@ -93,12 +89,8 @@ RUN groupadd -g 1001 ghidra && useradd -u 1001 -g ghidra -m -s /bin/bash ghidra
# Download and extract Ghidra (in runtime stage for cleaner image)
WORKDIR /opt
# Download with retries and resume support for unreliable connections
RUN for i in 1 2 3 4 5; do \
curl -fSL --http1.1 -C - \
"https://github.com/NationalSecurityAgency/ghidra/releases/download/Ghidra_${GHIDRA_VERSION}_build/ghidra_${GHIDRA_VERSION}_PUBLIC_${GHIDRA_DATE}.zip" \
-o ghidra.zip && break || sleep 30; \
done \
RUN curl -fsSL "https://github.com/NationalSecurityAgency/ghidra/releases/download/Ghidra_${GHIDRA_VERSION}_build/ghidra_${GHIDRA_VERSION}_PUBLIC_${GHIDRA_DATE}.zip" \
-o ghidra.zip \
&& unzip -q ghidra.zip \
&& rm ghidra.zip \
&& mv ghidra_${GHIDRA_VERSION}_PUBLIC ghidra \

View File

@ -412,28 +412,12 @@ class MCGhidraHandler(HttpHandler):
"success": False,
"error": {"code": "NOT_FOUND", "message": "Endpoint not found: %s %s" % (method, path)}
})
except:
# Catch ALL exceptions including Java exceptions
import sys
exc_info = sys.exc_info()
try:
# Try to get a string representation safely
if exc_info[1] is not None:
msg = str(exc_info[1])
else:
msg = str(exc_info[0])
except:
msg = "Unknown exception"
except Exception as e:
try:
self._send_response(exchange, 500, {
"success": False,
"error": {"code": "INTERNAL_ERROR", "message": msg}
"error": {"code": "INTERNAL_ERROR", "message": str(e)}
})
except:
# Last resort - at least don't crash silently
try:
exchange.sendResponseHeaders(500, 0)
exchange.getResponseBody().close()
except:
pass
@ -1008,9 +992,6 @@ class MCGhidraHandler(HttpHandler):
except:
return {"success": False, "error": {"code": "INVALID_ADDRESS", "message": "Invalid address: %s" % addr_str}}
if addr is None:
return {"success": False, "error": {"code": "INVALID_ADDRESS", "message": "Could not parse address: %s" % addr_str}}
result_holder = [None]
def do_create():
@ -1028,14 +1009,8 @@ class MCGhidraHandler(HttpHandler):
"message": "Function created successfully",
}}, 201)
return {"success": False, "error": {"code": "CREATE_FAILED", "message": "Failed to create function at %s" % addr_str}}
except:
import sys
exc = sys.exc_info()[1]
try:
msg = str(exc)
except:
msg = "Failed to create function"
return {"success": False, "error": {"code": "CREATE_ERROR", "message": msg}}
except Exception as e:
return {"success": False, "error": {"code": "CREATE_ERROR", "message": str(e)}}
# -- Signature --
@ -1181,9 +1156,6 @@ class MCGhidraHandler(HttpHandler):
except:
return {"success": False, "error": {"code": "INVALID_ADDRESS", "message": "Invalid address: %s" % addr_str}}
if addr is None:
return {"success": False, "error": {"code": "INVALID_ADDRESS", "message": "Could not parse address: %s" % addr_str}}
# Label creation (newName field)
new_name = body.get("newName")
if new_name:
@ -1192,14 +1164,8 @@ class MCGhidraHandler(HttpHandler):
try:
with_transaction(self.program, "Create label", do_label)
return {"success": True, "result": {"address": addr_str, "name": new_name, "message": "Label created"}}
except:
import sys
exc = sys.exc_info()[1]
try:
msg = str(exc)
except:
msg = "Failed to create label"
return {"success": False, "error": {"code": "LABEL_ERROR", "message": msg}}
except Exception as e:
return {"success": False, "error": {"code": "LABEL_ERROR", "message": str(e)}}
# Data creation (type field)
type_name = body.get("type")
@ -1221,14 +1187,8 @@ class MCGhidraHandler(HttpHandler):
try:
with_transaction(self.program, "Create data", do_create_data)
return ({"success": True, "result": {"address": addr_str, "type": type_name, "message": "Data created"}}, 201)
except:
import sys
exc = sys.exc_info()[1]
try:
msg = str(exc)
except:
msg = "Failed to create data"
return {"success": False, "error": {"code": "DATA_ERROR", "message": msg}}
except Exception as e:
return {"success": False, "error": {"code": "DATA_ERROR", "message": str(e)}}
def handle_data_delete(self, exchange):
if not self.program:

View File

@ -1,6 +1,6 @@
[project]
name = "mcghidra"
version = "2026.2.11"
version = "2025.12.3"
description = "AI-assisted reverse engineering bridge: a multi-instance Ghidra plugin exposed via a HATEOAS REST API plus an MCP Python bridge for decompilation, analysis & binary manipulation"
readme = "README.md"
requires-python = ">=3.11"
@ -22,7 +22,7 @@ requires = ["hatchling"]
build-backend = "hatchling.build"
[tool.hatch.build.targets.wheel]
packages = ["mcghidra"]
packages = ["src/mcghidra"]
[tool.hatch.build]
sources = ["src"]

View File

@ -242,10 +242,41 @@ class AnalysisMixin(MCGhidraMixinBase):
return paginated
# NOTE: ui_get_current_address and ui_get_current_function were removed
# because they require Ghidra GUI context which is never available in
# headless MCP mode. Use functions_get(address=...) or data_list(addr=...)
# with explicit addresses instead.
@mcp_tool()
def ui_get_current_address(self, port: Optional[int] = None) -> Dict[str, Any]:
"""Get the address currently selected in Ghidra's UI.
Args:
port: Ghidra instance port (optional)
Returns:
Current address information
"""
try:
port = self.get_instance_port(port)
except ValueError as e:
return {"success": False, "error": {"code": "NO_INSTANCE", "message": str(e)}}
response = self.safe_get(port, "address")
return self.simplify_response(response)
@mcp_tool()
def ui_get_current_function(self, port: Optional[int] = None) -> Dict[str, Any]:
"""Get the function currently selected in Ghidra's UI.
Args:
port: Ghidra instance port (optional)
Returns:
Current function information
"""
try:
port = self.get_instance_port(port)
except ValueError as e:
return {"success": False, "error": {"code": "NO_INSTANCE", "message": str(e)}}
response = self.safe_get(port, "function")
return self.simplify_response(response)
@mcp_tool()
def comments_get(

View File

@ -581,6 +581,14 @@ class DockerMixin(MCGhidraMixinBase):
if not binary_file.exists():
return {"error": f"Binary not found: {binary_path}"}
# Always allocate from pool to prevent conflicts between sessions
port = self.port_pool.allocate(self.session_id)
if port is None:
return {
"error": "Port pool exhausted (8192-8223). Stop some containers first.",
"allocated_ports": self.port_pool.get_allocated_ports(),
}
# Generate container name if not specified
if name is None:
name = self._generate_container_name(binary_file.name)
@ -594,38 +602,19 @@ class DockerMixin(MCGhidraMixinBase):
["ps", "-a", "-q", "-f", f"name=^{name}$"], check=False
)
if check_result.stdout.strip():
self.port_pool.release(port)
return {
"error": f"Container '{name}' already exists. Stop it first with docker_stop."
}
# Allocate a port that's both lockable AND not in use by Docker
# This handles external containers (not managed by MCGhidra) using ports in our range
port = None
ports_tried = []
for _ in range(PORT_POOL_END - PORT_POOL_START + 1):
candidate_port = self.port_pool.allocate(self.session_id)
if candidate_port is None:
break # Pool exhausted
# Check if this port is already in use by a Docker container
# Check if port is already in use by a non-pool container
port_check = await self._run_docker_cmd(
["ps", "-q", "-f", f"publish={candidate_port}"], check=False
["ps", "-q", "-f", f"publish={port}"], check=False
)
if port_check.stdout.strip():
# Port is in use by Docker - release and try next
ports_tried.append(candidate_port)
self.port_pool.release(candidate_port)
continue
# Found a usable port!
port = candidate_port
break
if port is None:
self.port_pool.release(port)
return {
"error": "Port pool exhausted (8192-8223). All ports are in use by Docker containers.",
"ports_checked": ports_tried if ports_tried else "all ports locked by other MCGhidra sessions",
"allocated_ports": self.port_pool.get_allocated_ports(),
"error": f"Port {port} is already in use by another container"
}
# Build label arguments

2
uv.lock generated
View File

@ -572,7 +572,7 @@ wheels = [
[[package]]
name = "mcghidra"
version = "2026.2.11"
version = "2025.12.3"
source = { editable = "." }
dependencies = [
{ name = "fastmcp" },