Release v2.0.0-beta.5: Critical fixes for stable release
- Fixed memory_write() endpoint to use correct ProgramEndpoints path - Standardized all error responses to structured format with error codes - Enhanced instances_discover() to return both new and existing instances - Updated API version to 2005 for compatibility tracking - Verified all bridge-to-plugin endpoint mappings are correct - Confirmed route registration order follows proper specificity Ready for v2.0.0 stable release after final testing.
This commit is contained in:
parent
977791432f
commit
4379bea14f
@ -31,15 +31,15 @@ DEFAULT_GHIDRA_HOST = "localhost"
|
||||
QUICK_DISCOVERY_RANGE = range(DEFAULT_GHIDRA_PORT, DEFAULT_GHIDRA_PORT+10)
|
||||
FULL_DISCOVERY_RANGE = range(DEFAULT_GHIDRA_PORT, DEFAULT_GHIDRA_PORT+20)
|
||||
|
||||
BRIDGE_VERSION = "v2.0.0-beta.4"
|
||||
REQUIRED_API_VERSION = 2004
|
||||
BRIDGE_VERSION = "v2.0.0-beta.5"
|
||||
REQUIRED_API_VERSION = 2005
|
||||
|
||||
current_instance_port = DEFAULT_GHIDRA_PORT
|
||||
|
||||
instructions = """
|
||||
GhydraMCP allows interacting with multiple Ghidra SRE instances. Ghidra SRE is a tool for reverse engineering and analyzing binaries, e.g. malware.
|
||||
|
||||
First, run `instances_discover()` to find open Ghidra instances. Then use `instances_use(port)` to set your working instance.
|
||||
First, run `instances_discover()` to find all available Ghidra instances (both already known and newly discovered). Then use `instances_use(port)` to set your working instance.
|
||||
|
||||
The API is organized into namespaces for different types of operations:
|
||||
- instances_* : For managing Ghidra instances
|
||||
@ -447,13 +447,18 @@ def register_instance(port: int, url: str = None) -> str:
|
||||
return f"Error: Could not connect to instance at {url}: {str(e)}"
|
||||
|
||||
def _discover_instances(port_range, host=None, timeout=0.5) -> dict:
|
||||
"""Internal function to discover Ghidra instances by scanning ports"""
|
||||
"""Internal function to discover NEW Ghidra instances by scanning ports
|
||||
|
||||
This function only returns newly discovered instances that weren't already
|
||||
in the active_instances registry. Use instances_discover() for a complete
|
||||
list including already known instances.
|
||||
"""
|
||||
found_instances = []
|
||||
scan_host = host if host is not None else ghidra_host
|
||||
|
||||
for port in port_range:
|
||||
if port in active_instances:
|
||||
continue
|
||||
continue # Skip already known instances
|
||||
|
||||
url = f"http://{scan_host}:{port}"
|
||||
try:
|
||||
@ -597,7 +602,14 @@ def ghidra_instance(port: int = None) -> dict:
|
||||
result = response.get("result", {})
|
||||
|
||||
if not isinstance(result, dict):
|
||||
return {"error": "Invalid response format from Ghidra instance"}
|
||||
return {
|
||||
"success": False,
|
||||
"error": {
|
||||
"code": "INVALID_RESPONSE",
|
||||
"message": "Invalid response format from Ghidra instance"
|
||||
},
|
||||
"timestamp": int(time.time() * 1000)
|
||||
}
|
||||
|
||||
instance_info = {
|
||||
"port": port,
|
||||
@ -727,7 +739,14 @@ def function_info_by_address(port: int = None, address: str = None) -> dict:
|
||||
dict: Complete function information including signature, parameters, etc.
|
||||
"""
|
||||
if not address:
|
||||
return {"error": "Address parameter is required"}
|
||||
return {
|
||||
"success": False,
|
||||
"error": {
|
||||
"code": "MISSING_PARAMETER",
|
||||
"message": "Address parameter is required"
|
||||
},
|
||||
"timestamp": int(time.time() * 1000)
|
||||
}
|
||||
|
||||
port = _get_instance_port(port)
|
||||
|
||||
@ -739,10 +758,15 @@ def function_info_by_address(port: int = None, address: str = None) -> dict:
|
||||
if (not isinstance(simplified, dict) or
|
||||
not simplified.get("success", False) or
|
||||
"result" not in simplified):
|
||||
error = {"error": "Could not get function information"}
|
||||
if isinstance(simplified, dict) and "error" in simplified:
|
||||
error["error_details"] = simplified["error"]
|
||||
return error
|
||||
return {
|
||||
"success": False,
|
||||
"error": {
|
||||
"code": "FUNCTION_NOT_FOUND",
|
||||
"message": "Could not get function information",
|
||||
"details": simplified.get("error") if isinstance(simplified, dict) else None
|
||||
},
|
||||
"timestamp": int(time.time() * 1000)
|
||||
}
|
||||
|
||||
# Return just the function data without API metadata
|
||||
return simplified["result"]
|
||||
@ -759,7 +783,14 @@ def function_info_by_name(port: int = None, name: str = None) -> dict:
|
||||
dict: Complete function information including signature, parameters, etc.
|
||||
"""
|
||||
if not name:
|
||||
return {"error": "Name parameter is required"}
|
||||
return {
|
||||
"success": False,
|
||||
"error": {
|
||||
"code": "MISSING_PARAMETER",
|
||||
"message": "Name parameter is required"
|
||||
},
|
||||
"timestamp": int(time.time() * 1000)
|
||||
}
|
||||
|
||||
port = _get_instance_port(port)
|
||||
|
||||
@ -771,10 +802,15 @@ def function_info_by_name(port: int = None, name: str = None) -> dict:
|
||||
if (not isinstance(simplified, dict) or
|
||||
not simplified.get("success", False) or
|
||||
"result" not in simplified):
|
||||
error = {"error": "Could not get function information"}
|
||||
if isinstance(simplified, dict) and "error" in simplified:
|
||||
error["error_details"] = simplified["error"]
|
||||
return error
|
||||
return {
|
||||
"success": False,
|
||||
"error": {
|
||||
"code": "FUNCTION_NOT_FOUND",
|
||||
"message": "Could not get function information",
|
||||
"details": simplified.get("error") if isinstance(simplified, dict) else None
|
||||
},
|
||||
"timestamp": int(time.time() * 1000)
|
||||
}
|
||||
|
||||
# Return just the function data without API metadata
|
||||
return simplified["result"]
|
||||
@ -1132,14 +1168,44 @@ def instances_list() -> dict:
|
||||
@mcp.tool()
|
||||
def instances_discover(host: str = None) -> dict:
|
||||
"""Discover available Ghidra instances by scanning ports
|
||||
|
||||
|
||||
Args:
|
||||
host: Optional host to scan (default: configured ghidra_host)
|
||||
|
||||
|
||||
Returns:
|
||||
dict: Contains 'found' count and 'instances' list with discovery results
|
||||
dict: Contains 'found' count, 'new_instances' count, and 'instances' list with all available instances
|
||||
"""
|
||||
return _discover_instances(QUICK_DISCOVERY_RANGE, host=host, timeout=0.5)
|
||||
# Get newly discovered instances
|
||||
discovery_result = _discover_instances(QUICK_DISCOVERY_RANGE, host=host, timeout=0.5)
|
||||
new_instances = discovery_result.get("instances", [])
|
||||
new_count = len(new_instances)
|
||||
|
||||
# Get all currently known instances (including ones that were already registered)
|
||||
all_instances = []
|
||||
with instances_lock:
|
||||
for port, info in active_instances.items():
|
||||
instance_info = {
|
||||
"port": port,
|
||||
"url": info["url"],
|
||||
"project": info.get("project", ""),
|
||||
"file": info.get("file", ""),
|
||||
"plugin_version": info.get("plugin_version", "unknown"),
|
||||
"api_version": info.get("api_version", "unknown")
|
||||
}
|
||||
|
||||
# Mark if this was newly discovered in this call
|
||||
instance_info["newly_discovered"] = any(inst["port"] == port for inst in new_instances)
|
||||
|
||||
all_instances.append(instance_info)
|
||||
|
||||
# Sort by port for consistent ordering
|
||||
all_instances.sort(key=lambda x: x["port"])
|
||||
|
||||
return {
|
||||
"found": len(all_instances), # Total instances available
|
||||
"new_instances": new_count, # How many were newly discovered
|
||||
"instances": all_instances # All available instances
|
||||
}
|
||||
|
||||
@mcp.tool()
|
||||
def instances_register(port: int, url: str = None) -> str:
|
||||
@ -1597,7 +1663,8 @@ def memory_write(address: str, bytes_data: str, format: str = "hex", port: int =
|
||||
"format": format
|
||||
}
|
||||
|
||||
response = safe_patch(port, f"memory/{address}", payload)
|
||||
# Memory write is handled by ProgramEndpoints, not MemoryEndpoints
|
||||
response = safe_patch(port, f"programs/current/memory/{address}", payload)
|
||||
return simplify_response(response)
|
||||
|
||||
# Xrefs tools
|
||||
@ -1713,7 +1780,10 @@ def data_create(address: str, data_type: str, size: int = None, port: int = None
|
||||
if not address or not data_type:
|
||||
return {
|
||||
"success": False,
|
||||
"error": "Address and data_type parameters are required",
|
||||
"error": {
|
||||
"code": "MISSING_PARAMETER",
|
||||
"message": "Address and data_type parameters are required"
|
||||
},
|
||||
"timestamp": int(time.time() * 1000)
|
||||
}
|
||||
|
||||
@ -1802,7 +1872,10 @@ def data_delete(address: str, port: int = None) -> dict:
|
||||
if not address:
|
||||
return {
|
||||
"success": False,
|
||||
"error": "Address parameter is required",
|
||||
"error": {
|
||||
"code": "MISSING_PARAMETER",
|
||||
"message": "Address parameter is required"
|
||||
},
|
||||
"timestamp": int(time.time() * 1000)
|
||||
}
|
||||
|
||||
@ -1908,7 +1981,10 @@ def analysis_get_dataflow(address: str, direction: str = "forward", max_steps: i
|
||||
if not address:
|
||||
return {
|
||||
"success": False,
|
||||
"error": "Address parameter is required",
|
||||
"error": {
|
||||
"code": "MISSING_PARAMETER",
|
||||
"message": "Address parameter is required"
|
||||
},
|
||||
"timestamp": int(time.time() * 1000)
|
||||
}
|
||||
|
||||
|
||||
@ -1,8 +1,8 @@
|
||||
package eu.starsong.ghidra.api;
|
||||
|
||||
public class ApiConstants {
|
||||
public static final String PLUGIN_VERSION = "v2.0.0-beta.4";
|
||||
public static final int API_VERSION = 2004;
|
||||
public static final String PLUGIN_VERSION = "v2.0.0-beta.5";
|
||||
public static final int API_VERSION = 2005;
|
||||
public static final int DEFAULT_PORT = 8192;
|
||||
public static final int MAX_PORT_ATTEMPTS = 10;
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user