From 41bfa40d3a4decfb3433a989da5354fde75c202b Mon Sep 17 00:00:00 2001 From: Teal Bauer Date: Sun, 13 Apr 2025 09:38:13 +0200 Subject: [PATCH] Fix bridge error --- bridge_mcp_hydra.py | 138 ++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 128 insertions(+), 10 deletions(-) diff --git a/bridge_mcp_hydra.py b/bridge_mcp_hydra.py index 66171e9..ec8c2c3 100644 --- a/bridge_mcp_hydra.py +++ b/bridge_mcp_hydra.py @@ -302,7 +302,7 @@ def unregister_instance(port: int) -> str: @mcp.tool() -def discover_instances(host: str = null) -> dict: +def discover_instances(host: str = None) -> dict: """Discover available Ghidra instances by scanning ports Args: @@ -341,6 +341,65 @@ def _discover_instances(port_range, host=None, timeout=0.5) -> dict: } +@mcp.tool() +def list_programs(port: int = DEFAULT_GHIDRA_PORT, + offset: int = 0, + limit: int = 100, + project: str = None) -> dict: + """List all programs across all projects with filtering and pagination + + Args: + port: Ghidra instance port (default: 8192) + offset: Pagination offset (default: 0) + limit: Maximum items to return (default: 100) + project: Filter by project name + + Returns: + dict: { + "result": list of program info objects, + "size": total count, + "offset": current offset, + "limit": current limit, + "_links": pagination links + } + """ + params = { + "offset": offset, + "limit": limit + } + if project: + params["project"] = project + + response = safe_get(port, "programs", params) + if isinstance(response, dict) and "error" in response: + return response + + return { + "result": response.get("result", []), + "size": response.get("size", len(response.get("result", []))), + "offset": offset, + "limit": limit, + "_links": response.get("_links", {}) + } + + +@mcp.tool() +def get_current_program(port: int = DEFAULT_GHIDRA_PORT) -> dict: + """Get details about the currently loaded program + + Args: + port: Ghidra instance port (default: 8192) + + Returns: + dict: Program information including name, ID, language, etc. + """ + response = safe_get(port, "programs/current") + if isinstance(response, dict) and "error" in response: + return response + + return response + + @mcp.tool() def list_functions(port: int = DEFAULT_GHIDRA_PORT, offset: int = 0, @@ -797,7 +856,25 @@ def get_function_by_address(port: int = DEFAULT_GHIDRA_PORT, address: str = "") Returns: dict: Contains function name, address, signature and decompilation """ - return safe_get(port, "get_function_by_address", {"address": address}) + if not address: + return { + "success": False, + "error": "Address parameter is required", + "timestamp": int(time.time() * 1000) + } + + # Use the HATEOAS endpoint + response = safe_get(port, f"programs/current/functions/{address}") + + # Format the response for consistency + if isinstance(response, dict) and "success" in response and response["success"]: + # Add timestamp if not present + if "timestamp" not in response: + response["timestamp"] = int(time.time() * 1000) + # Add port for tracking + response["port"] = port + + return response @mcp.tool() @@ -973,14 +1050,37 @@ def decompile_function_by_address(port: int = DEFAULT_GHIDRA_PORT, address: str simplificationStyle: Decompiler style (default: "normalize") Returns: - dict: Contains decompiled code in 'result.decompilation' + dict: Contains decompiled code and function information """ - return safe_get(port, "decompile_function", { - "address": address, - "cCode": str(cCode).lower(), - "syntaxTree": str(syntaxTree).lower(), - "simplificationStyle": simplificationStyle - }) + if not address: + return { + "success": False, + "error": "Address parameter is required", + "timestamp": int(time.time() * 1000) + } + + # Use the HATEOAS endpoint + params = { + "syntax_tree": str(syntaxTree).lower(), + "style": simplificationStyle + } + + response = safe_get(port, f"programs/current/functions/{address}/decompile", params) + + # Format the response for consistency + if isinstance(response, dict) and "success" in response and response["success"]: + # Add timestamp if not present + if "timestamp" not in response: + response["timestamp"] = int(time.time() * 1000) + # Add port for tracking + response["port"] = port + + # Ensure the result has a decompilation field for backward compatibility + if "result" in response and isinstance(response["result"], dict): + if "ccode" in response["result"] and "decompilation" not in response["result"]: + response["result"]["decompilation"] = response["result"]["ccode"] + + return response @mcp.tool() @@ -994,7 +1094,25 @@ def disassemble_function(port: int = DEFAULT_GHIDRA_PORT, address: str = "") -> Returns: dict: Contains assembly instructions with addresses and comments """ - return safe_get(port, "disassemble_function", {"address": address}) + if not address: + return { + "success": False, + "error": "Address parameter is required", + "timestamp": int(time.time() * 1000) + } + + # Use the HATEOAS endpoint + response = safe_get(port, f"programs/current/functions/{address}/disassembly") + + # Format the response for consistency + if isinstance(response, dict) and "success" in response and response["success"]: + # Add timestamp if not present + if "timestamp" not in response: + response["timestamp"] = int(time.time() * 1000) + # Add port for tracking + response["port"] = port + + return response @mcp.tool()