mcghidra/docker/GhydraMCPServer.py
Ryan Malloy 88e1fe6ca8
Some checks are pending
Build Ghidra Plugin / build (push) Waiting to run
feat: Add headless HTTP server and entrypoint scripts
- Add GhydraMCPServer.py with fixed strings endpoint (Jython compatible)
- Fix strings endpoint to iterate through defined data instead of using
  DefinedDataIterator.definedStrings() which isn't accessible in Jython
- Add entrypoint.sh for Docker container initialization
2026-01-26 13:11:45 -07:00

349 lines
12 KiB
Python

# GhydraMCPServer.py - Headless Ghidra script for GhydraMCP HTTP API
# Python/Jython scripts don't require OSGi bundle registration
#
# Usage: analyzeHeadless <project> <name> -import <binary> -postScript GhydraMCPServer.py [port]
#
#@category GhydraMCP
#@keybinding
#@menupath
#@toolbar
from com.sun.net.httpserver import HttpServer, HttpHandler
from java.net import InetSocketAddress
from java.util.concurrent import Executors
from java.io import OutputStream
from ghidra.app.decompiler import DecompInterface
from ghidra.program.model.listing import Function
import json
import threading
API_VERSION = 2 # Integer for MCP client compatibility (minimum expected: 2)
API_VERSION_STRING = "2.1"
DEFAULT_PORT = 8192
class GhydraMCPHandler(HttpHandler):
def __init__(self, program, decompiler):
self.program = program
self.decompiler = decompiler
def handle(self, exchange):
try:
path = exchange.getRequestURI().getPath()
method = exchange.getRequestMethod()
# Route to appropriate handler
if path == "/" or path == "":
response = self.handle_root()
elif path == "/functions":
response = self.handle_functions()
elif path.endswith("/decompile"):
# Handle /functions/{address}/decompile
response = self.handle_decompile_by_path(path)
elif path.startswith("/functions/"):
response = self.handle_function_detail(path)
elif path == "/strings" or path == "/data/strings":
response = self.handle_strings()
elif path == "/info":
response = self.handle_info()
elif path == "/decompile":
response = self.handle_decompile(exchange)
else:
response = {"success": False, "error": "Not found", "path": path}
self.send_response(exchange, 200, response)
except Exception as e:
self.send_response(exchange, 500, {"success": False, "error": str(e)})
def send_response(self, exchange, code, data):
response_bytes = json.dumps(data, indent=2).encode('utf-8')
exchange.getResponseHeaders().set("Content-Type", "application/json")
exchange.getResponseHeaders().set("Access-Control-Allow-Origin", "*")
exchange.sendResponseHeaders(code, len(response_bytes))
os = exchange.getResponseBody()
os.write(response_bytes)
os.close()
def handle_root(self):
return {
"success": True,
"api_version": API_VERSION,
"message": "GhydraMCP API " + API_VERSION_STRING,
"program": self.program.getName() if self.program else None,
"endpoints": ["/", "/info", "/functions", "/functions/<name>", "/strings", "/decompile"],
"_links": {
"self": "/",
"functions": "/functions",
"strings": "/strings",
"info": "/info"
}
}
def handle_info(self):
if not self.program:
return {"success": False, "error": "No program loaded"}
return {
"success": True,
"name": self.program.getName(),
"path": self.program.getExecutablePath(),
"language": str(self.program.getLanguage().getLanguageID()),
"processor": str(self.program.getLanguage().getProcessor()),
"addressSize": self.program.getAddressFactory().getDefaultAddressSpace().getSize(),
"imageBase": str(self.program.getImageBase()),
}
def handle_functions(self):
if not self.program:
return {"success": False, "error": "No program loaded"}
functions = []
fm = self.program.getFunctionManager()
for func in fm.getFunctions(True): # True = forward iteration
functions.append({
"name": func.getName(),
"address": str(func.getEntryPoint()),
"signature": str(func.getSignature()),
})
if len(functions) >= 10000: # Higher limit for MCP client
break
return {
"success": True,
"size": len(functions),
"result": functions, # MCP client expects "result" key
}
def handle_function_detail(self, path):
if not self.program:
return {"success": False, "error": "No program loaded"}
# Extract function name or address from path
parts = path.split("/")
if len(parts) < 3:
return {"success": False, "error": "Invalid path"}
name_or_addr = parts[2]
fm = self.program.getFunctionManager()
# Try to find by name first
func = None
for f in fm.getFunctions(True):
if f.getName() == name_or_addr:
func = f
break
# If not found, try by address
if not func:
try:
addr = self.program.getAddressFactory().getAddress(name_or_addr)
func = fm.getFunctionAt(addr)
except:
pass
if not func:
return {"success": False, "error": "Function not found: " + name_or_addr}
return {
"success": True,
"name": func.getName(),
"address": str(func.getEntryPoint()),
"signature": str(func.getSignature()),
"body": str(func.getBody()),
"callingConvention": func.getCallingConventionName(),
"parameterCount": func.getParameterCount(),
}
def handle_strings(self):
if not self.program:
return {"success": False, "error": "No program loaded"}
strings = []
listing = self.program.getListing()
# Iterate through all defined data and filter for string types
for data in listing.getDefinedData(True): # True = forward iteration
try:
dt = data.getDataType()
if not dt:
continue
# Check if data type is a string variant
type_name = dt.getName().lower()
if "string" in type_name or type_name in ("char", "wchar"):
value = data.getValue()
if value:
str_val = str(value)
if len(str_val) > 1: # Skip single chars
strings.append({
"address": str(data.getAddress()),
"value": str_val[:200], # Truncate long strings
"length": len(str_val)
})
if len(strings) >= 5000:
break
except:
pass
return {
"success": True,
"size": len(strings),
"result": strings, # MCP client expects "result" key
}
def handle_decompile_by_path(self, path):
"""Handle /functions/{address}/decompile or /functions/by-name/{name}/decompile"""
if not self.program:
return {"success": False, "error": "No program loaded"}
# Parse address or name from path
# /functions/000496e8/decompile or /functions/by-name/main/decompile
parts = path.split("/")
func = None
fm = self.program.getFunctionManager()
if "by-name" in path and len(parts) >= 4:
# /functions/by-name/{name}/decompile
name = parts[3]
for f in fm.getFunctions(True):
if f.getName() == name:
func = f
break
elif len(parts) >= 3:
# /functions/{address}/decompile
addr_str = parts[2]
try:
addr = self.program.getAddressFactory().getAddress(addr_str)
func = fm.getFunctionAt(addr)
except:
pass
if not func:
return {"success": False, "error": "Function not found from path: " + path}
# Decompile
try:
result = self.decompiler.decompileFunction(func, 30, getMonitor())
if result and result.decompileCompleted():
code = result.getDecompiledFunction().getC()
return {
"success": True,
"result": {
"name": func.getName(),
"address": str(func.getEntryPoint()),
"decompiled_text": code,
"ccode": code,
}
}
else:
return {"success": False, "error": "Decompilation failed"}
except Exception as e:
return {"success": False, "error": str(e)}
def handle_decompile(self, exchange):
if not self.program:
return {"success": False, "error": "No program loaded"}
# Get function name from query params
query = exchange.getRequestURI().getQuery()
if not query:
return {"success": False, "error": "Missing 'name' or 'address' parameter"}
params = {}
for part in query.split("&"):
if "=" in part:
k, v = part.split("=", 1)
params[k] = v
name = params.get("name") or params.get("address")
if not name:
return {"success": False, "error": "Missing 'name' or 'address' parameter"}
# Find function
fm = self.program.getFunctionManager()
func = None
for f in fm.getFunctions(True):
if f.getName() == name:
func = f
break
if not func:
try:
addr = self.program.getAddressFactory().getAddress(name)
func = fm.getFunctionAt(addr)
except:
pass
if not func:
return {"success": False, "error": "Function not found: " + name}
# Decompile
try:
result = self.decompiler.decompileFunction(func, 30, getMonitor())
if result and result.decompileCompleted():
code = result.getDecompiledFunction().getC()
return {
"success": True,
"name": func.getName(),
"address": str(func.getEntryPoint()),
"decompiled": code
}
else:
return {"success": False, "error": "Decompilation failed"}
except Exception as e:
return {"success": False, "error": str(e)}
def run_server(port, program, decompiler):
"""Start the HTTP server"""
server = HttpServer.create(InetSocketAddress(port), 0)
server.createContext("/", GhydraMCPHandler(program, decompiler))
server.setExecutor(Executors.newCachedThreadPool())
server.start()
println("GhydraMCP HTTP server started on port " + str(port))
return server
# Main script execution
def main():
port = DEFAULT_PORT
# Parse port from script arguments
args = getScriptArgs()
if args and len(args) > 0:
try:
port = int(args[0])
except:
println("Invalid port number, using default: " + str(DEFAULT_PORT))
# Initialize decompiler
decompiler = DecompInterface()
decompiler.openProgram(currentProgram)
println("=========================================")
println(" GhydraMCP Headless HTTP Server")
println("=========================================")
println(" API Version: " + API_VERSION_STRING + " (compat: " + str(API_VERSION) + ")")
println(" Port: " + str(port))
println(" Program: " + (currentProgram.getName() if currentProgram else "None"))
println(" Script: Python/Jython")
println("=========================================")
server = run_server(port, currentProgram, decompiler)
println("")
println("GhydraMCP Server running. Press Ctrl+C to stop.")
println("API available at: http://localhost:" + str(port) + "/")
# Keep the script running
import time
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
server.stop(0)
println("Server stopped.")
# Run the main function
main()