From 4379bea14f0f316a2a7d42ddafa5af715488a0a2 Mon Sep 17 00:00:00 2001 From: Teal Bauer Date: Wed, 18 Jun 2025 00:51:00 +0200 Subject: [PATCH] Release v2.0.0-beta.5: Critical fixes for stable release - Fixed memory_write() endpoint to use correct ProgramEndpoints path - Standardized all error responses to structured format with error codes - Enhanced instances_discover() to return both new and existing instances - Updated API version to 2005 for compatibility tracking - Verified all bridge-to-plugin endpoint mappings are correct - Confirmed route registration order follows proper specificity Ready for v2.0.0 stable release after final testing. --- bridge_mcp_hydra.py | 124 ++++++++++++++---- .../eu/starsong/ghidra/api/ApiConstants.java | 4 +- 2 files changed, 102 insertions(+), 26 deletions(-) diff --git a/bridge_mcp_hydra.py b/bridge_mcp_hydra.py index 309bd3d..8086f16 100644 --- a/bridge_mcp_hydra.py +++ b/bridge_mcp_hydra.py @@ -31,15 +31,15 @@ DEFAULT_GHIDRA_HOST = "localhost" QUICK_DISCOVERY_RANGE = range(DEFAULT_GHIDRA_PORT, DEFAULT_GHIDRA_PORT+10) FULL_DISCOVERY_RANGE = range(DEFAULT_GHIDRA_PORT, DEFAULT_GHIDRA_PORT+20) -BRIDGE_VERSION = "v2.0.0-beta.4" -REQUIRED_API_VERSION = 2004 +BRIDGE_VERSION = "v2.0.0-beta.5" +REQUIRED_API_VERSION = 2005 current_instance_port = DEFAULT_GHIDRA_PORT instructions = """ GhydraMCP allows interacting with multiple Ghidra SRE instances. Ghidra SRE is a tool for reverse engineering and analyzing binaries, e.g. malware. -First, run `instances_discover()` to find open Ghidra instances. Then use `instances_use(port)` to set your working instance. +First, run `instances_discover()` to find all available Ghidra instances (both already known and newly discovered). Then use `instances_use(port)` to set your working instance. The API is organized into namespaces for different types of operations: - instances_* : For managing Ghidra instances @@ -447,13 +447,18 @@ def register_instance(port: int, url: str = None) -> str: return f"Error: Could not connect to instance at {url}: {str(e)}" def _discover_instances(port_range, host=None, timeout=0.5) -> dict: - """Internal function to discover Ghidra instances by scanning ports""" + """Internal function to discover NEW Ghidra instances by scanning ports + + This function only returns newly discovered instances that weren't already + in the active_instances registry. Use instances_discover() for a complete + list including already known instances. + """ found_instances = [] scan_host = host if host is not None else ghidra_host for port in port_range: if port in active_instances: - continue + continue # Skip already known instances url = f"http://{scan_host}:{port}" try: @@ -597,7 +602,14 @@ def ghidra_instance(port: int = None) -> dict: result = response.get("result", {}) if not isinstance(result, dict): - return {"error": "Invalid response format from Ghidra instance"} + return { + "success": False, + "error": { + "code": "INVALID_RESPONSE", + "message": "Invalid response format from Ghidra instance" + }, + "timestamp": int(time.time() * 1000) + } instance_info = { "port": port, @@ -727,7 +739,14 @@ def function_info_by_address(port: int = None, address: str = None) -> dict: dict: Complete function information including signature, parameters, etc. """ if not address: - return {"error": "Address parameter is required"} + return { + "success": False, + "error": { + "code": "MISSING_PARAMETER", + "message": "Address parameter is required" + }, + "timestamp": int(time.time() * 1000) + } port = _get_instance_port(port) @@ -739,10 +758,15 @@ def function_info_by_address(port: int = None, address: str = None) -> dict: if (not isinstance(simplified, dict) or not simplified.get("success", False) or "result" not in simplified): - error = {"error": "Could not get function information"} - if isinstance(simplified, dict) and "error" in simplified: - error["error_details"] = simplified["error"] - return error + return { + "success": False, + "error": { + "code": "FUNCTION_NOT_FOUND", + "message": "Could not get function information", + "details": simplified.get("error") if isinstance(simplified, dict) else None + }, + "timestamp": int(time.time() * 1000) + } # Return just the function data without API metadata return simplified["result"] @@ -759,7 +783,14 @@ def function_info_by_name(port: int = None, name: str = None) -> dict: dict: Complete function information including signature, parameters, etc. """ if not name: - return {"error": "Name parameter is required"} + return { + "success": False, + "error": { + "code": "MISSING_PARAMETER", + "message": "Name parameter is required" + }, + "timestamp": int(time.time() * 1000) + } port = _get_instance_port(port) @@ -771,10 +802,15 @@ def function_info_by_name(port: int = None, name: str = None) -> dict: if (not isinstance(simplified, dict) or not simplified.get("success", False) or "result" not in simplified): - error = {"error": "Could not get function information"} - if isinstance(simplified, dict) and "error" in simplified: - error["error_details"] = simplified["error"] - return error + return { + "success": False, + "error": { + "code": "FUNCTION_NOT_FOUND", + "message": "Could not get function information", + "details": simplified.get("error") if isinstance(simplified, dict) else None + }, + "timestamp": int(time.time() * 1000) + } # Return just the function data without API metadata return simplified["result"] @@ -1132,14 +1168,44 @@ def instances_list() -> dict: @mcp.tool() def instances_discover(host: str = None) -> dict: """Discover available Ghidra instances by scanning ports - + Args: host: Optional host to scan (default: configured ghidra_host) - + Returns: - dict: Contains 'found' count and 'instances' list with discovery results + dict: Contains 'found' count, 'new_instances' count, and 'instances' list with all available instances """ - return _discover_instances(QUICK_DISCOVERY_RANGE, host=host, timeout=0.5) + # Get newly discovered instances + discovery_result = _discover_instances(QUICK_DISCOVERY_RANGE, host=host, timeout=0.5) + new_instances = discovery_result.get("instances", []) + new_count = len(new_instances) + + # Get all currently known instances (including ones that were already registered) + all_instances = [] + with instances_lock: + for port, info in active_instances.items(): + instance_info = { + "port": port, + "url": info["url"], + "project": info.get("project", ""), + "file": info.get("file", ""), + "plugin_version": info.get("plugin_version", "unknown"), + "api_version": info.get("api_version", "unknown") + } + + # Mark if this was newly discovered in this call + instance_info["newly_discovered"] = any(inst["port"] == port for inst in new_instances) + + all_instances.append(instance_info) + + # Sort by port for consistent ordering + all_instances.sort(key=lambda x: x["port"]) + + return { + "found": len(all_instances), # Total instances available + "new_instances": new_count, # How many were newly discovered + "instances": all_instances # All available instances + } @mcp.tool() def instances_register(port: int, url: str = None) -> str: @@ -1597,7 +1663,8 @@ def memory_write(address: str, bytes_data: str, format: str = "hex", port: int = "format": format } - response = safe_patch(port, f"memory/{address}", payload) + # Memory write is handled by ProgramEndpoints, not MemoryEndpoints + response = safe_patch(port, f"programs/current/memory/{address}", payload) return simplify_response(response) # Xrefs tools @@ -1713,7 +1780,10 @@ def data_create(address: str, data_type: str, size: int = None, port: int = None if not address or not data_type: return { "success": False, - "error": "Address and data_type parameters are required", + "error": { + "code": "MISSING_PARAMETER", + "message": "Address and data_type parameters are required" + }, "timestamp": int(time.time() * 1000) } @@ -1802,7 +1872,10 @@ def data_delete(address: str, port: int = None) -> dict: if not address: return { "success": False, - "error": "Address parameter is required", + "error": { + "code": "MISSING_PARAMETER", + "message": "Address parameter is required" + }, "timestamp": int(time.time() * 1000) } @@ -1908,7 +1981,10 @@ def analysis_get_dataflow(address: str, direction: str = "forward", max_steps: i if not address: return { "success": False, - "error": "Address parameter is required", + "error": { + "code": "MISSING_PARAMETER", + "message": "Address parameter is required" + }, "timestamp": int(time.time() * 1000) } diff --git a/src/main/java/eu/starsong/ghidra/api/ApiConstants.java b/src/main/java/eu/starsong/ghidra/api/ApiConstants.java index 3ff1cd2..4dedbc8 100644 --- a/src/main/java/eu/starsong/ghidra/api/ApiConstants.java +++ b/src/main/java/eu/starsong/ghidra/api/ApiConstants.java @@ -1,8 +1,8 @@ package eu.starsong.ghidra.api; public class ApiConstants { - public static final String PLUGIN_VERSION = "v2.0.0-beta.4"; - public static final int API_VERSION = 2004; + public static final String PLUGIN_VERSION = "v2.0.0-beta.5"; + public static final int API_VERSION = 2005; public static final int DEFAULT_PORT = 8192; public static final int MAX_PORT_ATTEMPTS = 10; }