diff --git a/GHIDRA_HTTP_API.md b/GHIDRA_HTTP_API.md new file mode 100644 index 0000000..8dbb19f --- /dev/null +++ b/GHIDRA_HTTP_API.md @@ -0,0 +1,73 @@ +# Ghidra HTTP API Documentation + +## Base URL +`http://{host}:{port}/` (default port: 8192) + +## Endpoints + +### Instance Management +- `GET /instances` - List active instances +- `GET /info` - Get project information +- `GET /` - Root endpoint with basic info + +### Program Analysis +- `GET /functions` - List functions + - Parameters: + - `offset` - Pagination offset + - `limit` - Max items to return + - `query` - Search string for function names + +- `GET /functions/{name}` - Get function details + - Parameters: + - `cCode` - Return C-style code (true/false) + - `syntaxTree` - Include syntax tree (true/false) + - `simplificationStyle` - Decompiler style + +- `GET /get_function_by_address` - Get function by address + - Parameters: + - `address` - Memory address in hex + +- `GET /classes` - List classes +- `GET /segments` - List memory segments +- `GET /symbols/imports` - List imported symbols +- `GET /symbols/exports` - List exported symbols +- `GET /namespaces` - List namespaces +- `GET /data` - List data items +- `GET /variables` - List global variables +- `GET /functions/{name}/variables` - List function variables + +### Modifications +- `POST /functions/{name}` - Rename function + - Body: `{"newName": string}` + +- `POST /data` - Rename data at address + - Body: `{"address": string, "newName": string}` + +- `POST /set_decompiler_comment` - Add decompiler comment + - Body: `{"address": string, "comment": string}` + +- `POST /set_disassembly_comment` - Add disassembly comment + - Body: `{"address": string, "comment": string}` + +- `POST /rename_local_variable` - Rename local variable + - Body: `{"functionAddress": string, "oldName": string, "newName": string}` + +- `POST /rename_function_by_address` - Rename function by address + - Body: `{"functionAddress": string, "newName": string}` + +- `POST /set_function_prototype` - Update function prototype + - Body: `{"functionAddress": string, "prototype": string}` + +- `POST /set_local_variable_type` - Change variable type + - Body: `{"functionAddress": string, "variableName": string, "newType": string}` + +## Response Format +All endpoints return JSON with standard structure: +```json +{ + "success": boolean, + "result": object|array, + "error": string, // if success=false + "timestamp": number, + "port": number +} diff --git a/bridge_mcp_hydra.py b/bridge_mcp_hydra.py index 89570fa..1daf174 100644 --- a/bridge_mcp_hydra.py +++ b/bridge_mcp_hydra.py @@ -27,8 +27,8 @@ instances_lock = Lock() DEFAULT_GHIDRA_PORT = 8192 DEFAULT_GHIDRA_HOST = "localhost" # Port ranges for scanning -QUICK_DISCOVERY_RANGE = range(8192, 8202) # Limited range for interactive/triggered discovery (10 ports) -FULL_DISCOVERY_RANGE = range(8192, 8212) # Wider range for background discovery (20 ports) +QUICK_DISCOVERY_RANGE = range(DEFAULT_GHIDRA_PORT, DEFAULT_GHIDRA_PORT+10) +FULL_DISCOVERY_RANGE = range(DEFAULT_GHIDRA_PORT, DEFAULT_GHIDRA_PORT+20) instructions = """ GhydraMCP allows interacting with multiple Ghidra SRE instances. Ghidra SRE is a tool for reverse engineering and analyzing binaries, e.g. malware. @@ -39,7 +39,6 @@ First, run `discover_instances` to find open Ghidra instances. List tools to see mcp = FastMCP("GhydraMCP", instructions=instructions) ghidra_host = os.environ.get("GHIDRA_HYDRA_HOST", DEFAULT_GHIDRA_HOST) -# print(f"Using Ghidra host: {ghidra_host}") def get_instance_url(port: int) -> str: """Get URL for a Ghidra instance by port""" @@ -47,7 +46,6 @@ def get_instance_url(port: int) -> str: if port in active_instances: return active_instances[port]["url"] - # Auto-register if not found but port is valid if 8192 <= port <= 65535: register_instance(port) if port in active_instances: @@ -79,10 +77,8 @@ def _make_request(method: str, port: int, endpoint: str, params: dict = None, js if headers: request_headers.update(headers) - # Origin validation for state-changing requests - is_state_changing = method.upper() in ["POST", "PUT", "DELETE"] # Add other methods if needed + is_state_changing = method.upper() in ["POST", "PUT", "PATCH", "DELETE"] if is_state_changing: - # Extract headers from json_data if present, otherwise use provided headers check_headers = json_data.get("headers", {}) if isinstance(json_data, dict) else (headers or {}) if not validate_origin(check_headers): return { @@ -91,11 +87,10 @@ def _make_request(method: str, port: int, endpoint: str, params: dict = None, js "status_code": 403, "timestamp": int(time.time() * 1000) } - # Set Content-Type for POST/PUT if sending JSON if json_data is not None: request_headers['Content-Type'] = 'application/json' elif data is not None: - request_headers['Content-Type'] = 'text/plain' # Or appropriate type + request_headers['Content-Type'] = 'text/plain' try: response = requests.request( @@ -105,35 +100,29 @@ def _make_request(method: str, port: int, endpoint: str, params: dict = None, js json=json_data, data=data, headers=request_headers, - timeout=10 # Increased timeout slightly + timeout=10 ) - # Attempt to parse JSON regardless of status code, as errors might be JSON try: parsed_json = response.json() - # Add timestamp if not present in the response from Ghidra if isinstance(parsed_json, dict) and "timestamp" not in parsed_json: parsed_json["timestamp"] = int(time.time() * 1000) return parsed_json except ValueError: - # Handle non-JSON responses (e.g., unexpected errors, successful plain text) if response.ok: - # Success, but not JSON - wrap it? Or assume plugin *always* returns JSON? - # For now, treat unexpected non-JSON success as an error from the plugin side. return { "success": False, "error": "Received non-JSON success response from Ghidra plugin", "status_code": response.status_code, - "response_text": response.text[:500], # Limit text length + "response_text": response.text[:500], "timestamp": int(time.time() * 1000) } else: - # Error response was not JSON return { "success": False, "error": f"HTTP {response.status_code} - Non-JSON error response", "status_code": response.status_code, - "response_text": response.text[:500], # Limit text length + "response_text": response.text[:500], "timestamp": int(time.time() * 1000) } @@ -141,14 +130,14 @@ def _make_request(method: str, port: int, endpoint: str, params: dict = None, js return { "success": False, "error": "Request timed out", - "status_code": 408, # Request Timeout + "status_code": 408, "timestamp": int(time.time() * 1000) } except requests.exceptions.ConnectionError: return { "success": False, "error": f"Failed to connect to Ghidra instance at {url}", - "status_code": 503, # Service Unavailable + "status_code": 503, "timestamp": int(time.time() * 1000) } except Exception as e: @@ -160,12 +149,11 @@ def _make_request(method: str, port: int, endpoint: str, params: dict = None, js } def safe_get(port: int, endpoint: str, params: dict = None) -> dict: - """Perform a GET request to a specific Ghidra instance and return JSON response""" + """Make GET request to Ghidra instance""" return _make_request("GET", port, endpoint, params=params) def safe_put(port: int, endpoint: str, data: dict) -> dict: - """Perform a PUT request to a specific Ghidra instance with JSON payload""" - # Pass headers if they exist within the data dict + """Make PUT request to Ghidra instance with JSON payload""" headers = data.pop("headers", None) if isinstance(data, dict) else None return _make_request("PUT", port, endpoint, json_data=data, headers=headers) @@ -179,14 +167,18 @@ def safe_post(port: int, endpoint: str, data: dict | str) -> dict: headers = data.pop("headers", None) json_payload = data else: - text_payload = data # Assume string data is text/plain + text_payload = data return _make_request("POST", port, endpoint, json_data=json_payload, data=text_payload, headers=headers) # Instance management tools @mcp.tool() def list_instances() -> dict: - """List all active Ghidra instances""" + """List all active Ghidra instances + + Returns: + dict: Contains 'instances' list with port, url, project and file info for each instance + """ with instances_lock: return { "instances": [ @@ -202,22 +194,27 @@ def list_instances() -> dict: @mcp.tool() def register_instance(port: int, url: str = None) -> str: - """Register a new Ghidra instance""" + """Register a new Ghidra instance + + Args: + port: Port number of the Ghidra instance + url: Optional URL if different from default http://host:port + + Returns: + str: Confirmation message or error + """ if url is None: url = f"http://{ghidra_host}:{port}" - # Verify instance is reachable before registering try: test_url = f"{url}/instances" response = requests.get(test_url, timeout=2) if not response.ok: return f"Error: Instance at {url} is not responding properly" - # Try to get project info project_info = {"url": url} try: - # Try the root endpoint first root_url = f"{url}/" root_response = requests.get(root_url, timeout=1.5) # Short timeout for root @@ -225,7 +222,6 @@ def register_instance(port: int, url: str = None) -> str: try: root_data = root_response.json() - # Extract basic information from root if "project" in root_data and root_data["project"]: project_info["project"] = root_data["project"] if "file" in root_data and root_data["file"]: @@ -234,7 +230,6 @@ def register_instance(port: int, url: str = None) -> str: except Exception as e: print(f"Error parsing root info: {e}", file=sys.stderr) - # If we don't have project info yet, try the /info endpoint as a fallback if not project_info.get("project") and not project_info.get("file"): info_url = f"{url}/info" @@ -243,11 +238,9 @@ def register_instance(port: int, url: str = None) -> str: if info_response.ok: try: info_data = info_response.json() - # Extract relevant information if "project" in info_data and info_data["project"]: project_info["project"] = info_data["project"] - # Handle file information file_info = info_data.get("file", {}) if isinstance(file_info, dict) and file_info.get("name"): project_info["file"] = file_info.get("name", "") @@ -272,7 +265,14 @@ def register_instance(port: int, url: str = None) -> str: @mcp.tool() def unregister_instance(port: int) -> str: - """Unregister a Ghidra instance""" + """Unregister a Ghidra instance + + Args: + port: Port number of the instance to unregister + + Returns: + str: Confirmation message or error + """ with instances_lock: if port in active_instances: del active_instances[port] @@ -280,11 +280,14 @@ def unregister_instance(port: int) -> str: return f"No instance found on port {port}" @mcp.tool() -def discover_instances(host: str = None) -> dict: - """Auto-discover Ghidra instances by scanning ports (quick discovery with limited range) - +def discover_instances(host: str = null) -> dict: + """Discover available Ghidra instances by scanning ports + Args: - host: Optional host to scan (defaults to configured ghidra_host) + host: Optional host to scan (default: configured ghidra_host) + + Returns: + dict: Contains 'found' count and 'instances' list with discovery results """ return _discover_instances(QUICK_DISCOVERY_RANGE, host=host, timeout=0.5) @@ -300,7 +303,7 @@ def _discover_instances(port_range, host=None, timeout=0.5) -> dict: url = f"http://{scan_host}:{port}" try: test_url = f"{url}/instances" - response = requests.get(test_url, timeout=timeout) # Short timeout for scanning + response = requests.get(test_url, timeout=timeout) if response.ok: result = register_instance(port, url) found_instances.append({"port": port, "url": url, "result": result}) @@ -315,36 +318,45 @@ def _discover_instances(port_range, host=None, timeout=0.5) -> dict: @mcp.tool() def list_functions(port: int = DEFAULT_GHIDRA_PORT, offset: int = 0, limit: int = 100) -> list: - """List all functions in the current program - + """List functions in the current program with pagination + Args: port: Ghidra instance port (default: 8192) offset: Pagination offset (default: 0) - limit: Maximum number of segments to return (default: 100) - + limit: Maximum items to return (default: 100) + Returns: - List of strings with function names and addresses + list: Function names and addresses """ return safe_get(port, "functions", {"offset": offset, "limit": limit}) @mcp.tool() def list_classes(port: int = DEFAULT_GHIDRA_PORT, offset: int = 0, limit: int = 100) -> list: - """List all classes with pagination""" + """List classes in the current program with pagination + + Args: + port: Ghidra instance port (default: 8192) + offset: Pagination offset (default: 0) + limit: Maximum items to return (default: 100) + + Returns: + list: Class names and info + """ return safe_get(port, "classes", {"offset": offset, "limit": limit}) @mcp.tool() def get_function(port: int = DEFAULT_GHIDRA_PORT, name: str = "", cCode: bool = True, syntaxTree: bool = False, simplificationStyle: str = "normalize") -> dict: - """Get decompiled code for a specific function - + """Get decompiled code for a function + Args: port: Ghidra instance port (default: 8192) - name: Name of the function to decompile - cCode: Whether to output C code (default: True) - syntaxTree: Whether to include syntax tree (default: False) - simplificationStyle: Decompiler analysis style (default: "normalize") - + name: Function name to decompile + cCode: Return C-style code (default: True) + syntaxTree: Include syntax tree (default: False) + simplificationStyle: Decompiler style (default: "normalize") + Returns: - Dict containing function details including decompiled code + dict: Contains function name, address, signature and decompilation """ response = safe_get(port, f"functions/{quote(name)}", { "cCode": str(cCode).lower(), @@ -352,124 +364,125 @@ def get_function(port: int = DEFAULT_GHIDRA_PORT, name: str = "", cCode: bool = "simplificationStyle": simplificationStyle }) - # Check if the response is a string (old format) or already a dict with proper structure - if isinstance(response, dict) and "success" in response: - # If it's already a properly structured response, return it - return response - elif isinstance(response, str): - # If it's a string (old format), wrap it in a proper structure - return { - "success": True, - "result": { - "name": name, - "address": "", # We don't have the address here - "signature": "", # We don't have the signature here - "decompilation": response - }, - "timestamp": int(time.time() * 1000), - "port": port - } - else: - # Unexpected format, return an error + if not isinstance(response, dict) or "success" not in response: return { "success": False, - "error": "Unexpected response format from Ghidra plugin", + "error": "Invalid response format from Ghidra plugin", "timestamp": int(time.time() * 1000), "port": port } + return response @mcp.tool() def update_function(port: int = DEFAULT_GHIDRA_PORT, name: str = "", new_name: str = "") -> str: - """Rename a function (Modify -> POST)""" + """Rename a function + + Args: + port: Ghidra instance port (default: 8192) + name: Current function name + new_name: New function name + + Returns: + str: Confirmation message or error + """ return safe_post(port, f"functions/{quote(name)}", {"newName": new_name}) @mcp.tool() def update_data(port: int = DEFAULT_GHIDRA_PORT, address: str = "", new_name: str = "") -> str: - """Rename data at specified address (Modify -> POST)""" + """Rename data at a memory address + + Args: + port: Ghidra instance port (default: 8192) + address: Memory address in hex format + new_name: New name for the data + + Returns: + str: Confirmation message or error + """ return safe_post(port, "data", {"address": address, "newName": new_name}) @mcp.tool() def list_segments(port: int = DEFAULT_GHIDRA_PORT, offset: int = 0, limit: int = 100) -> list: - """List all memory segments in the current program with pagination - + """List memory segments with pagination + Args: port: Ghidra instance port (default: 8192) offset: Pagination offset (default: 0) - limit: Maximum number of segments to return (default: 100) - + limit: Maximum items to return (default: 100) + Returns: - List of segment information strings + list: Segment information strings """ return safe_get(port, "segments", {"offset": offset, "limit": limit}) @mcp.tool() def list_imports(port: int = DEFAULT_GHIDRA_PORT, offset: int = 0, limit: int = 100) -> list: - """List all imported symbols with pagination - + """List imported symbols with pagination + Args: port: Ghidra instance port (default: 8192) offset: Pagination offset (default: 0) - limit: Maximum number of imports to return (default: 100) - + limit: Maximum items to return (default: 100) + Returns: - List of import information strings + list: Imported symbol information """ return safe_get(port, "symbols/imports", {"offset": offset, "limit": limit}) @mcp.tool() def list_exports(port: int = DEFAULT_GHIDRA_PORT, offset: int = 0, limit: int = 100) -> list: - """List all exported symbols with pagination - + """List exported symbols with pagination + Args: port: Ghidra instance port (default: 8192) offset: Pagination offset (default: 0) - limit: Maximum number of exports to return (default: 100) - + limit: Maximum items to return (default: 100) + Returns: - List of export information strings + list: Exported symbol information """ return safe_get(port, "symbols/exports", {"offset": offset, "limit": limit}) @mcp.tool() def list_namespaces(port: int = DEFAULT_GHIDRA_PORT, offset: int = 0, limit: int = 100) -> list: - """List all namespaces in the current program with pagination - + """List namespaces with pagination + Args: port: Ghidra instance port (default: 8192) offset: Pagination offset (default: 0) - limit: Maximum number of namespaces to return (default: 100) - + limit: Maximum items to return (default: 100) + Returns: - List of namespace information strings + list: Namespace information strings """ return safe_get(port, "namespaces", {"offset": offset, "limit": limit}) @mcp.tool() def list_data_items(port: int = DEFAULT_GHIDRA_PORT, offset: int = 0, limit: int = 100) -> list: - """List all defined data items with pagination - + """List data items with pagination + Args: port: Ghidra instance port (default: 8192) offset: Pagination offset (default: 0) - limit: Maximum number of data items to return (default: 100) - + limit: Maximum items to return (default: 100) + Returns: - List of data item information strings + list: Data item information strings """ return safe_get(port, "data", {"offset": offset, "limit": limit}) @mcp.tool() def search_functions_by_name(port: int = DEFAULT_GHIDRA_PORT, query: str = "", offset: int = 0, limit: int = 100) -> list: - """Search for functions by name with pagination - + """Search functions by name with pagination + Args: port: Ghidra instance port (default: 8192) - query: Search string to match against function names + query: Search string for function names offset: Pagination offset (default: 0) - limit: Maximum number of functions to return (default: 100) - + limit: Maximum items to return (default: 100) + Returns: - List of matching function information strings or error message if query is empty + list: Matching function info or error if query empty """ if not query: return ["Error: query string is required"] @@ -477,23 +490,20 @@ def search_functions_by_name(port: int = DEFAULT_GHIDRA_PORT, query: str = "", o @mcp.tool() def get_function_by_address(port: int = DEFAULT_GHIDRA_PORT, address: str = "") -> dict: - """Get function details by its memory address - + """Get function details by memory address + Args: port: Ghidra instance port (default: 8192) - address: Memory address of the function (hex string) - + address: Memory address in hex format + Returns: - Dict containing function details including name, address, signature, and decompilation + dict: Contains function name, address, signature and decompilation """ response = safe_get(port, "get_function_by_address", {"address": address}) - # Check if the response is a string (old format) or already a dict with proper structure if isinstance(response, dict) and "success" in response: - # If it's already a properly structured response, return it return response elif isinstance(response, str): - # If it's a string (old format), wrap it in a proper structure return { "success": True, "result": { @@ -504,7 +514,6 @@ def get_function_by_address(port: int = DEFAULT_GHIDRA_PORT, address: str = "") "port": port } else: - # Unexpected format, return an error return { "success": False, "error": "Unexpected response format from Ghidra plugin", @@ -513,44 +522,42 @@ def get_function_by_address(port: int = DEFAULT_GHIDRA_PORT, address: str = "") } @mcp.tool() -def get_current_address(port: int = DEFAULT_GHIDRA_PORT) -> dict: # Return dict - """Get the address currently selected in Ghidra's UI - +def get_current_address(port: int = DEFAULT_GHIDRA_PORT) -> dict: + """Get currently selected address in Ghidra UI + Args: port: Ghidra instance port (default: 8192) - + Returns: - Dict containing the current memory address (hex format) + dict: Contains current memory address in hex format """ - # Directly return the dictionary from safe_get return safe_get(port, "get_current_address") @mcp.tool() -def get_current_function(port: int = DEFAULT_GHIDRA_PORT) -> dict: # Return dict - """Get the function currently selected in Ghidra's UI - +def get_current_function(port: int = DEFAULT_GHIDRA_PORT) -> dict: + """Get currently selected function in Ghidra UI + Args: port: Ghidra instance port (default: 8192) - + Returns: - Dict containing function details including name, address, and signature + dict: Contains function name, address and signature """ - # Directly return the dictionary from safe_get return safe_get(port, "get_current_function") @mcp.tool() def decompile_function_by_address(port: int = DEFAULT_GHIDRA_PORT, address: str = "", cCode: bool = True, syntaxTree: bool = False, simplificationStyle: str = "normalize") -> dict: - """Decompile a function at a specific memory address - + """Decompile function at memory address + Args: port: Ghidra instance port (default: 8192) - address: Memory address of the function (hex string) - cCode: Whether to output C code (default: True) - syntaxTree: Whether to include syntax tree (default: False) - simplificationStyle: Decompiler analysis style (default: "normalize") - + address: Memory address in hex format + cCode: Return C-style code (default: True) + syntaxTree: Include syntax tree (default: False) + simplificationStyle: Decompiler style (default: "normalize") + Returns: - Dict containing the decompiled pseudocode in the 'result.decompilation' field + dict: Contains decompiled code in 'result.decompilation' """ response = safe_get(port, "decompile_function", { "address": address, @@ -559,125 +566,111 @@ def decompile_function_by_address(port: int = DEFAULT_GHIDRA_PORT, address: str "simplificationStyle": simplificationStyle }) - # Check if the response is a string (old format) or already a dict with proper structure - if isinstance(response, dict) and "success" in response: - # If it's already a properly structured response, return it - return response - elif isinstance(response, str): - # If it's a string (old format), wrap it in a proper structure - return { - "success": True, - "result": { - "decompilation": response - }, - "timestamp": int(time.time() * 1000), - "port": port - } - else: - # Unexpected format, return an error + if not isinstance(response, dict) or "success" not in response: return { "success": False, - "error": "Unexpected response format from Ghidra plugin", + "error": "Invalid response format from Ghidra plugin", "timestamp": int(time.time() * 1000), "port": port } + return response @mcp.tool() -def disassemble_function(port: int = DEFAULT_GHIDRA_PORT, address: str = "") -> dict: # Return dict - """Get disassembly for a function at a specific address - +def disassemble_function(port: int = DEFAULT_GHIDRA_PORT, address: str = "") -> dict: + """Get disassembly for function at address + Args: port: Ghidra instance port (default: 8192) - address: Memory address of the function (hex string) - + address: Memory address in hex format + Returns: - List of strings showing assembly instructions with addresses and comments + dict: Contains assembly instructions with addresses and comments """ return safe_get(port, "disassemble_function", {"address": address}) @mcp.tool() def set_decompiler_comment(port: int = DEFAULT_GHIDRA_PORT, address: str = "", comment: str = "") -> str: - """Add/edit a comment in the decompiler view at a specific address - + """Add/edit decompiler comment at address + Args: port: Ghidra instance port (default: 8192) - address: Memory address to place comment (hex string) - comment: Text of the comment to add - + address: Memory address in hex format + comment: Comment text to add + Returns: - Confirmation message or error if failed + str: Confirmation message or error """ return safe_post(port, "set_decompiler_comment", {"address": address, "comment": comment}) @mcp.tool() def set_disassembly_comment(port: int = DEFAULT_GHIDRA_PORT, address: str = "", comment: str = "") -> str: - """Add/edit a comment in the disassembly view at a specific address - + """Add/edit disassembly comment at address + Args: port: Ghidra instance port (default: 8192) - address: Memory address to place comment (hex string) - comment: Text of the comment to add - + address: Memory address in hex format + comment: Comment text to add + Returns: - Confirmation message or error if failed + str: Confirmation message or error """ return safe_post(port, "set_disassembly_comment", {"address": address, "comment": comment}) @mcp.tool() def rename_local_variable(port: int = DEFAULT_GHIDRA_PORT, function_address: str = "", old_name: str = "", new_name: str = "") -> str: - """Rename a local variable within a function - + """Rename local variable in function + Args: port: Ghidra instance port (default: 8192) - function_address: Memory address of the function (hex string) - old_name: Current name of the variable - new_name: New name for the variable - + function_address: Function memory address in hex + old_name: Current variable name + new_name: New variable name + Returns: - Confirmation message or error if failed + str: Confirmation message or error """ return safe_post(port, "rename_local_variable", {"functionAddress": function_address, "oldName": old_name, "newName": new_name}) @mcp.tool() def rename_function_by_address(port: int = DEFAULT_GHIDRA_PORT, function_address: str = "", new_name: str = "") -> str: - """Rename a function at a specific memory address - + """Rename function at memory address + Args: port: Ghidra instance port (default: 8192) - function_address: Memory address of the function (hex string) - new_name: New name for the function - + function_address: Function memory address in hex + new_name: New function name + Returns: - Confirmation message or error if failed + str: Confirmation message or error """ return safe_post(port, "rename_function_by_address", {"functionAddress": function_address, "newName": new_name}) @mcp.tool() def set_function_prototype(port: int = DEFAULT_GHIDRA_PORT, function_address: str = "", prototype: str = "") -> str: - """Update a function's signature/prototype - + """Update function signature/prototype + Args: port: Ghidra instance port (default: 8192) - function_address: Memory address of the function (hex string) - prototype: New function prototype string (e.g. "int func(int param1)") - + function_address: Function memory address in hex + prototype: New prototype string (e.g. "int func(int param1)") + Returns: - Confirmation message or error if failed + str: Confirmation message or error """ return safe_post(port, "set_function_prototype", {"functionAddress": function_address, "prototype": prototype}) @mcp.tool() def set_local_variable_type(port: int = DEFAULT_GHIDRA_PORT, function_address: str = "", variable_name: str = "", new_type: str = "") -> str: - """Change the data type of a local variable in a function - + """Change local variable data type + Args: port: Ghidra instance port (default: 8192) - function_address: Memory address of the function (hex string) - variable_name: Name of the variable to modify - new_type: New data type for the variable (e.g. "int", "char*") - + function_address: Function memory address in hex + variable_name: Variable name to modify + new_type: New data type (e.g. "int", "char*") + Returns: - Confirmation message or error if failed + str: Confirmation message or error """ return safe_post(port, "set_local_variable_type", {"functionAddress": function_address, "variableName": variable_name, "newType": new_type}) @@ -688,11 +681,11 @@ def list_variables(port: int = DEFAULT_GHIDRA_PORT, offset: int = 0, limit: int Args: port: Ghidra instance port (default: 8192) offset: Pagination offset (default: 0) - limit: Maximum number of variables to return (default: 100) - search: Optional search string to filter variables by name + limit: Maximum items to return (default: 100) + search: Optional filter for variable names Returns: - Dict containing the list of variables in the 'result' field + dict: Contains variables list in 'result' field """ params = {"offset": offset, "limit": limit} if search: @@ -700,73 +693,25 @@ def list_variables(port: int = DEFAULT_GHIDRA_PORT, offset: int = 0, limit: int response = safe_get(port, "variables", params) - # Check if the response is a string (old format) or already a dict with proper structure - if isinstance(response, dict) and "success" in response: - # If it's already a properly structured response, return it - return response - elif isinstance(response, str): - # If it's a string (old format), parse it and wrap it in a proper structure - # For empty response, return empty list - if not response.strip(): - return { - "success": True, - "result": [], - "timestamp": int(time.time() * 1000), - "port": port - } - - # Parse the string to extract variables - variables = [] - lines = response.strip().split('\n') - - for line in lines: - line = line.strip() - if line: - # Try to parse variable line - parts = line.split(':') - if len(parts) >= 2: - var_name = parts[0].strip() - var_type = ':'.join(parts[1:]).strip() - - # Extract address if present - address = "" - if '@' in var_type: - type_parts = var_type.split('@') - var_type = type_parts[0].strip() - address = type_parts[1].strip() - - variables.append({ - "name": var_name, - "dataType": var_type, - "address": address - }) - - # Return structured response - return { - "success": True, - "result": variables, - "timestamp": int(time.time() * 1000), - "port": port - } - else: - # Unexpected format, return an error + if not isinstance(response, dict) or "success" not in response: return { "success": False, - "error": "Unexpected response format from Ghidra plugin", + "error": "Invalid response format from Ghidra plugin", "timestamp": int(time.time() * 1000), "port": port } + return response @mcp.tool() def list_function_variables(port: int = DEFAULT_GHIDRA_PORT, function: str = "") -> dict: - """List variables in a specific function - + """List variables in function + Args: port: Ghidra instance port (default: 8192) - function: Name of the function to list variables for - + function: Function name to list variables for + Returns: - Dict containing the function variables in the 'result.variables' field + dict: Contains variables list in 'result.variables' """ if not function: return {"success": False, "error": "Function name is required"} @@ -774,83 +719,27 @@ def list_function_variables(port: int = DEFAULT_GHIDRA_PORT, function: str = "") encoded_name = quote(function) response = safe_get(port, f"functions/{encoded_name}/variables", {}) - # Check if the response is a string (old format) or already a dict with proper structure - if isinstance(response, dict) and "success" in response: - # If it's already a properly structured response, return it - return response - elif isinstance(response, str): - # If it's a string (old format), parse it and wrap it in a proper structure - # Example string format: "Function: init_peripherals\n\nParameters:\n none\n\nLocal Variables:\n powArrThree: undefined * @ 08000230\n pvartwo: undefined * @ 08000212\n pvarEins: undefined * @ 08000206\n" - - # Parse the string to extract variables - variables = [] - lines = response.strip().split('\n') - - # Extract function name from first line if possible - function_name = function - if lines and lines[0].startswith("Function:"): - function_name = lines[0].replace("Function:", "").strip() - - # Look for local variables section - in_local_vars = False - for line in lines: - line = line.strip() - if line == "Local Variables:": - in_local_vars = True - continue - - if in_local_vars and line and not line.startswith("Function:") and not line.startswith("Parameters:"): - # Parse variable line: " varName: type @ address" - parts = line.strip().split(':') - if len(parts) >= 2: - var_name = parts[0].strip() - var_type = ':'.join(parts[1:]).strip() - - # Extract address if present - address = "" - if '@' in var_type: - type_parts = var_type.split('@') - var_type = type_parts[0].strip() - address = type_parts[1].strip() - - variables.append({ - "name": var_name, - "dataType": var_type, - "address": address, - "type": "local" - }) - - # Return structured response - return { - "success": True, - "result": { - "function": function_name, - "variables": variables - }, - "timestamp": int(time.time() * 1000), - "port": port - } - else: - # Unexpected format, return an error + if not isinstance(response, dict) or "success" not in response: return { "success": False, - "error": "Unexpected response format from Ghidra plugin", + "error": "Invalid response format from Ghidra plugin", "timestamp": int(time.time() * 1000), "port": port } + return response @mcp.tool() def rename_variable(port: int = DEFAULT_GHIDRA_PORT, function: str = "", name: str = "", new_name: str = "") -> dict: - """Rename a variable in a function + """Rename variable in function Args: port: Ghidra instance port (default: 8192) - function: Name of the function containing the variable - name: Current name of the variable - new_name: New name for the variable + function: Function name containing variable + name: Current variable name + new_name: New variable name Returns: - Dict containing the result of the operation + dict: Operation result """ if not function or not name or not new_name: return {"success": False, "error": "Function, name, and new_name parameters are required"} @@ -861,16 +750,16 @@ def rename_variable(port: int = DEFAULT_GHIDRA_PORT, function: str = "", name: s @mcp.tool() def retype_variable(port: int = DEFAULT_GHIDRA_PORT, function: str = "", name: str = "", data_type: str = "") -> dict: - """Change the data type of a variable in a function + """Change variable data type in function Args: port: Ghidra instance port (default: 8192) - function: Name of the function containing the variable - name: Current name of the variable - data_type: New data type for the variable + function: Function name containing variable + name: Variable name to modify + data_type: New data type Returns: - Dict containing the result of the operation + dict: Operation result """ if not function or not name or not data_type: return {"success": False, "error": "Function, name, and data_type parameters are required"} @@ -886,10 +775,8 @@ def periodic_discovery(): """Periodically discover new instances""" while True: try: - # Use the full discovery range _discover_instances(FULL_DISCOVERY_RANGE, timeout=0.5) - # Also check if any existing instances are down with instances_lock: ports_to_remove = [] for port, info in active_instances.items(): @@ -901,24 +788,19 @@ def periodic_discovery(): except requests.exceptions.RequestException: ports_to_remove.append(port) - # Remove any instances that are down for port in ports_to_remove: del active_instances[port] print(f"Removed unreachable instance on port {port}") except Exception as e: print(f"Error in periodic discovery: {e}") - # Sleep for 30 seconds before next scan time.sleep(30) if __name__ == "__main__": - # Auto-register default instance register_instance(DEFAULT_GHIDRA_PORT, f"http://{ghidra_host}:{DEFAULT_GHIDRA_PORT}") - # Auto-discover other instances discover_instances() - # Start periodic discovery in background thread discovery_thread = threading.Thread( target=periodic_discovery, daemon=True,