From 88e1fe6ca8d35ded2f20e7da84fc11f0240fa3fe Mon Sep 17 00:00:00 2001 From: Ryan Malloy Date: Mon, 26 Jan 2026 13:11:45 -0700 Subject: [PATCH] feat: Add headless HTTP server and entrypoint scripts - Add GhydraMCPServer.py with fixed strings endpoint (Jython compatible) - Fix strings endpoint to iterate through defined data instead of using DefinedDataIterator.definedStrings() which isn't accessible in Jython - Add entrypoint.sh for Docker container initialization --- docker/GhydraMCPServer.py | 348 ++++++++++++++++++++++++++++++++++++++ docker/entrypoint.sh | 145 ++++++++++++++++ 2 files changed, 493 insertions(+) create mode 100644 docker/GhydraMCPServer.py create mode 100755 docker/entrypoint.sh diff --git a/docker/GhydraMCPServer.py b/docker/GhydraMCPServer.py new file mode 100644 index 0000000..142f591 --- /dev/null +++ b/docker/GhydraMCPServer.py @@ -0,0 +1,348 @@ +# GhydraMCPServer.py - Headless Ghidra script for GhydraMCP HTTP API +# Python/Jython scripts don't require OSGi bundle registration +# +# Usage: analyzeHeadless -import -postScript GhydraMCPServer.py [port] +# +#@category GhydraMCP +#@keybinding +#@menupath +#@toolbar + +from com.sun.net.httpserver import HttpServer, HttpHandler +from java.net import InetSocketAddress +from java.util.concurrent import Executors +from java.io import OutputStream +from ghidra.app.decompiler import DecompInterface +from ghidra.program.model.listing import Function +import json +import threading + +API_VERSION = 2 # Integer for MCP client compatibility (minimum expected: 2) +API_VERSION_STRING = "2.1" +DEFAULT_PORT = 8192 + +class GhydraMCPHandler(HttpHandler): + def __init__(self, program, decompiler): + self.program = program + self.decompiler = decompiler + + def handle(self, exchange): + try: + path = exchange.getRequestURI().getPath() + method = exchange.getRequestMethod() + + # Route to appropriate handler + if path == "/" or path == "": + response = self.handle_root() + elif path == "/functions": + response = self.handle_functions() + elif path.endswith("/decompile"): + # Handle /functions/{address}/decompile + response = self.handle_decompile_by_path(path) + elif path.startswith("/functions/"): + response = self.handle_function_detail(path) + elif path == "/strings" or path == "/data/strings": + response = self.handle_strings() + elif path == "/info": + response = self.handle_info() + elif path == "/decompile": + response = self.handle_decompile(exchange) + else: + response = {"success": False, "error": "Not found", "path": path} + + self.send_response(exchange, 200, response) + except Exception as e: + self.send_response(exchange, 500, {"success": False, "error": str(e)}) + + def send_response(self, exchange, code, data): + response_bytes = json.dumps(data, indent=2).encode('utf-8') + exchange.getResponseHeaders().set("Content-Type", "application/json") + exchange.getResponseHeaders().set("Access-Control-Allow-Origin", "*") + exchange.sendResponseHeaders(code, len(response_bytes)) + os = exchange.getResponseBody() + os.write(response_bytes) + os.close() + + def handle_root(self): + return { + "success": True, + "api_version": API_VERSION, + "message": "GhydraMCP API " + API_VERSION_STRING, + "program": self.program.getName() if self.program else None, + "endpoints": ["/", "/info", "/functions", "/functions/", "/strings", "/decompile"], + "_links": { + "self": "/", + "functions": "/functions", + "strings": "/strings", + "info": "/info" + } + } + + def handle_info(self): + if not self.program: + return {"success": False, "error": "No program loaded"} + return { + "success": True, + "name": self.program.getName(), + "path": self.program.getExecutablePath(), + "language": str(self.program.getLanguage().getLanguageID()), + "processor": str(self.program.getLanguage().getProcessor()), + "addressSize": self.program.getAddressFactory().getDefaultAddressSpace().getSize(), + "imageBase": str(self.program.getImageBase()), + } + + def handle_functions(self): + if not self.program: + return {"success": False, "error": "No program loaded"} + + functions = [] + fm = self.program.getFunctionManager() + for func in fm.getFunctions(True): # True = forward iteration + functions.append({ + "name": func.getName(), + "address": str(func.getEntryPoint()), + "signature": str(func.getSignature()), + }) + if len(functions) >= 10000: # Higher limit for MCP client + break + + return { + "success": True, + "size": len(functions), + "result": functions, # MCP client expects "result" key + } + + def handle_function_detail(self, path): + if not self.program: + return {"success": False, "error": "No program loaded"} + + # Extract function name or address from path + parts = path.split("/") + if len(parts) < 3: + return {"success": False, "error": "Invalid path"} + + name_or_addr = parts[2] + fm = self.program.getFunctionManager() + + # Try to find by name first + func = None + for f in fm.getFunctions(True): + if f.getName() == name_or_addr: + func = f + break + + # If not found, try by address + if not func: + try: + addr = self.program.getAddressFactory().getAddress(name_or_addr) + func = fm.getFunctionAt(addr) + except: + pass + + if not func: + return {"success": False, "error": "Function not found: " + name_or_addr} + + return { + "success": True, + "name": func.getName(), + "address": str(func.getEntryPoint()), + "signature": str(func.getSignature()), + "body": str(func.getBody()), + "callingConvention": func.getCallingConventionName(), + "parameterCount": func.getParameterCount(), + } + + def handle_strings(self): + if not self.program: + return {"success": False, "error": "No program loaded"} + + strings = [] + listing = self.program.getListing() + + # Iterate through all defined data and filter for string types + for data in listing.getDefinedData(True): # True = forward iteration + try: + dt = data.getDataType() + if not dt: + continue + + # Check if data type is a string variant + type_name = dt.getName().lower() + if "string" in type_name or type_name in ("char", "wchar"): + value = data.getValue() + if value: + str_val = str(value) + if len(str_val) > 1: # Skip single chars + strings.append({ + "address": str(data.getAddress()), + "value": str_val[:200], # Truncate long strings + "length": len(str_val) + }) + if len(strings) >= 5000: + break + except: + pass + + return { + "success": True, + "size": len(strings), + "result": strings, # MCP client expects "result" key + } + + def handle_decompile_by_path(self, path): + """Handle /functions/{address}/decompile or /functions/by-name/{name}/decompile""" + if not self.program: + return {"success": False, "error": "No program loaded"} + + # Parse address or name from path + # /functions/000496e8/decompile or /functions/by-name/main/decompile + parts = path.split("/") + + func = None + fm = self.program.getFunctionManager() + + if "by-name" in path and len(parts) >= 4: + # /functions/by-name/{name}/decompile + name = parts[3] + for f in fm.getFunctions(True): + if f.getName() == name: + func = f + break + elif len(parts) >= 3: + # /functions/{address}/decompile + addr_str = parts[2] + try: + addr = self.program.getAddressFactory().getAddress(addr_str) + func = fm.getFunctionAt(addr) + except: + pass + + if not func: + return {"success": False, "error": "Function not found from path: " + path} + + # Decompile + try: + result = self.decompiler.decompileFunction(func, 30, getMonitor()) + if result and result.decompileCompleted(): + code = result.getDecompiledFunction().getC() + return { + "success": True, + "result": { + "name": func.getName(), + "address": str(func.getEntryPoint()), + "decompiled_text": code, + "ccode": code, + } + } + else: + return {"success": False, "error": "Decompilation failed"} + except Exception as e: + return {"success": False, "error": str(e)} + + def handle_decompile(self, exchange): + if not self.program: + return {"success": False, "error": "No program loaded"} + + # Get function name from query params + query = exchange.getRequestURI().getQuery() + if not query: + return {"success": False, "error": "Missing 'name' or 'address' parameter"} + + params = {} + for part in query.split("&"): + if "=" in part: + k, v = part.split("=", 1) + params[k] = v + + name = params.get("name") or params.get("address") + if not name: + return {"success": False, "error": "Missing 'name' or 'address' parameter"} + + # Find function + fm = self.program.getFunctionManager() + func = None + for f in fm.getFunctions(True): + if f.getName() == name: + func = f + break + + if not func: + try: + addr = self.program.getAddressFactory().getAddress(name) + func = fm.getFunctionAt(addr) + except: + pass + + if not func: + return {"success": False, "error": "Function not found: " + name} + + # Decompile + try: + result = self.decompiler.decompileFunction(func, 30, getMonitor()) + if result and result.decompileCompleted(): + code = result.getDecompiledFunction().getC() + return { + "success": True, + "name": func.getName(), + "address": str(func.getEntryPoint()), + "decompiled": code + } + else: + return {"success": False, "error": "Decompilation failed"} + except Exception as e: + return {"success": False, "error": str(e)} + + +def run_server(port, program, decompiler): + """Start the HTTP server""" + server = HttpServer.create(InetSocketAddress(port), 0) + server.createContext("/", GhydraMCPHandler(program, decompiler)) + server.setExecutor(Executors.newCachedThreadPool()) + server.start() + println("GhydraMCP HTTP server started on port " + str(port)) + return server + + +# Main script execution +def main(): + port = DEFAULT_PORT + + # Parse port from script arguments + args = getScriptArgs() + if args and len(args) > 0: + try: + port = int(args[0]) + except: + println("Invalid port number, using default: " + str(DEFAULT_PORT)) + + # Initialize decompiler + decompiler = DecompInterface() + decompiler.openProgram(currentProgram) + + println("=========================================") + println(" GhydraMCP Headless HTTP Server") + println("=========================================") + println(" API Version: " + API_VERSION_STRING + " (compat: " + str(API_VERSION) + ")") + println(" Port: " + str(port)) + println(" Program: " + (currentProgram.getName() if currentProgram else "None")) + println(" Script: Python/Jython") + println("=========================================") + + server = run_server(port, currentProgram, decompiler) + + println("") + println("GhydraMCP Server running. Press Ctrl+C to stop.") + println("API available at: http://localhost:" + str(port) + "/") + + # Keep the script running + import time + try: + while True: + time.sleep(1) + except KeyboardInterrupt: + server.stop(0) + println("Server stopped.") + + +# Run the main function +main() diff --git a/docker/entrypoint.sh b/docker/entrypoint.sh new file mode 100755 index 0000000..014b2a2 --- /dev/null +++ b/docker/entrypoint.sh @@ -0,0 +1,145 @@ +#!/bin/bash +# GhydraMCP Docker Entrypoint +# Starts Ghidra in headless mode with HTTP API server + +set -e + +GHYDRA_MODE=${GHYDRA_MODE:-headless} +GHYDRA_PORT=${GHYDRA_PORT:-8192} +GHYDRA_MAXMEM=${GHYDRA_MAXMEM:-2G} +GHIDRA_HOME=${GHIDRA_HOME:-/opt/ghidra} +# User scripts directory - Python scripts don't need OSGi bundle registration +SCRIPT_DIR=${SCRIPT_DIR:-/home/ghidra/ghidra_scripts} + +# Project settings +PROJECT_DIR=${PROJECT_DIR:-/projects} +PROJECT_NAME=${PROJECT_NAME:-GhydraMCP} + +echo "==============================================" +echo " GhydraMCP Docker Container" +echo "==============================================" +echo " Mode: ${GHYDRA_MODE}" +echo " Port: ${GHYDRA_PORT}" +echo " Memory: ${GHYDRA_MAXMEM}" +echo " Project: ${PROJECT_DIR}/${PROJECT_NAME}" +echo "==============================================" + +# Ensure directories exist +mkdir -p "${PROJECT_DIR}" + +# Handle different modes +case "${GHYDRA_MODE}" in + headless) + # Headless mode: Import a binary and start HTTP server + + if [ $# -eq 0 ]; then + echo "" + echo "Usage: docker run ghydramcp:latest [binary_path] [options]" + echo "" + echo "Examples:" + echo " # Analyze a binary mounted at /binaries/sample.exe" + echo " docker run -p 8192:8192 -v ./samples:/binaries ghydramcp /binaries/sample.exe" + echo "" + echo " # With custom project name" + echo " docker run -p 8192:8192 -v ./samples:/binaries -e PROJECT_NAME=malware ghydramcp /binaries/sample.exe" + echo "" + echo "Environment variables:" + echo " GHYDRA_PORT - HTTP API port (default: 8192)" + echo " GHYDRA_MAXMEM - Max JVM heap (default: 2G)" + echo " PROJECT_NAME - Ghidra project name (default: GhydraMCP)" + echo " PROJECT_DIR - Project directory (default: /projects)" + echo "" + echo "Starting in wait mode..." + echo "Container will stay running for debugging or manual operation." + echo "You can exec into this container to run analyzeHeadless manually." + echo "" + + # Keep container alive for debugging/manual operation + tail -f /dev/null + else + BINARY_PATH="$1" + shift + + if [ ! -f "${BINARY_PATH}" ]; then + echo "ERROR: Binary not found: ${BINARY_PATH}" + echo "Make sure to mount the binary directory with -v /host/path:/binaries" + exit 1 + fi + + BINARY_NAME=$(basename "${BINARY_PATH}") + echo "Importing and analyzing: ${BINARY_NAME}" + echo "" + + # Build the analyzeHeadless command + ANALYZE_CMD="${GHIDRA_HOME}/support/analyzeHeadless" + ANALYZE_ARGS=( + "${PROJECT_DIR}" + "${PROJECT_NAME}" + -import "${BINARY_PATH}" + -max-cpu 2 + -scriptPath "${SCRIPT_DIR}" + -postScript "GhydraMCPServer.py" "${GHYDRA_PORT}" + ) + + # Add any extra arguments passed + ANALYZE_ARGS+=("$@") + + echo "Running: ${ANALYZE_CMD} ${ANALYZE_ARGS[*]}" + echo "" + + exec "${ANALYZE_CMD}" "${ANALYZE_ARGS[@]}" + fi + ;; + + server) + # Server mode: Open existing project with HTTP server + echo "Starting GhydraMCP server on existing project..." + + if [ $# -eq 0 ]; then + echo "Usage: docker run -e GHYDRA_MODE=server ghydramcp [program_name]" + echo "" + echo " program_name: Name of program in the project to open" + exit 1 + fi + + PROGRAM_NAME="$1" + shift + + exec "${GHIDRA_HOME}/support/analyzeHeadless" \ + "${PROJECT_DIR}" "${PROJECT_NAME}" \ + -process "${PROGRAM_NAME}" \ + -noanalysis \ + -scriptPath "${SCRIPT_DIR}" \ + -postScript "GhydraMCPServer.py" "${GHYDRA_PORT}" \ + "$@" + ;; + + analyze) + # Analyze mode: Import and analyze, then exit (no HTTP server) + if [ $# -eq 0 ]; then + echo "Usage: docker run -e GHYDRA_MODE=analyze ghydramcp [binary_path]" + exit 1 + fi + + BINARY_PATH="$1" + shift + + echo "Analyzing binary: ${BINARY_PATH}" + exec "${GHIDRA_HOME}/support/analyzeHeadless" \ + "${PROJECT_DIR}" "${PROJECT_NAME}" \ + -import "${BINARY_PATH}" \ + -max-cpu 2 \ + "$@" + ;; + + shell) + # Interactive shell + exec /bin/bash + ;; + + *) + echo "Unknown mode: ${GHYDRA_MODE}" + echo "Valid modes: headless, server, analyze, shell" + exit 1 + ;; +esac