Some checks are pending
Build Ghidra Plugin / build (push) Waiting to run
- Add GhydraMCPServer.py with fixed strings endpoint (Jython compatible) - Fix strings endpoint to iterate through defined data instead of using DefinedDataIterator.definedStrings() which isn't accessible in Jython - Add entrypoint.sh for Docker container initialization
349 lines
12 KiB
Python
349 lines
12 KiB
Python
# GhydraMCPServer.py - Headless Ghidra script for GhydraMCP HTTP API
|
|
# Python/Jython scripts don't require OSGi bundle registration
|
|
#
|
|
# Usage: analyzeHeadless <project> <name> -import <binary> -postScript GhydraMCPServer.py [port]
|
|
#
|
|
#@category GhydraMCP
|
|
#@keybinding
|
|
#@menupath
|
|
#@toolbar
|
|
|
|
from com.sun.net.httpserver import HttpServer, HttpHandler
|
|
from java.net import InetSocketAddress
|
|
from java.util.concurrent import Executors
|
|
from java.io import OutputStream
|
|
from ghidra.app.decompiler import DecompInterface
|
|
from ghidra.program.model.listing import Function
|
|
import json
|
|
import threading
|
|
|
|
API_VERSION = 2 # Integer for MCP client compatibility (minimum expected: 2)
|
|
API_VERSION_STRING = "2.1"
|
|
DEFAULT_PORT = 8192
|
|
|
|
class GhydraMCPHandler(HttpHandler):
|
|
def __init__(self, program, decompiler):
|
|
self.program = program
|
|
self.decompiler = decompiler
|
|
|
|
def handle(self, exchange):
|
|
try:
|
|
path = exchange.getRequestURI().getPath()
|
|
method = exchange.getRequestMethod()
|
|
|
|
# Route to appropriate handler
|
|
if path == "/" or path == "":
|
|
response = self.handle_root()
|
|
elif path == "/functions":
|
|
response = self.handle_functions()
|
|
elif path.endswith("/decompile"):
|
|
# Handle /functions/{address}/decompile
|
|
response = self.handle_decompile_by_path(path)
|
|
elif path.startswith("/functions/"):
|
|
response = self.handle_function_detail(path)
|
|
elif path == "/strings" or path == "/data/strings":
|
|
response = self.handle_strings()
|
|
elif path == "/info":
|
|
response = self.handle_info()
|
|
elif path == "/decompile":
|
|
response = self.handle_decompile(exchange)
|
|
else:
|
|
response = {"success": False, "error": "Not found", "path": path}
|
|
|
|
self.send_response(exchange, 200, response)
|
|
except Exception as e:
|
|
self.send_response(exchange, 500, {"success": False, "error": str(e)})
|
|
|
|
def send_response(self, exchange, code, data):
|
|
response_bytes = json.dumps(data, indent=2).encode('utf-8')
|
|
exchange.getResponseHeaders().set("Content-Type", "application/json")
|
|
exchange.getResponseHeaders().set("Access-Control-Allow-Origin", "*")
|
|
exchange.sendResponseHeaders(code, len(response_bytes))
|
|
os = exchange.getResponseBody()
|
|
os.write(response_bytes)
|
|
os.close()
|
|
|
|
def handle_root(self):
|
|
return {
|
|
"success": True,
|
|
"api_version": API_VERSION,
|
|
"message": "GhydraMCP API " + API_VERSION_STRING,
|
|
"program": self.program.getName() if self.program else None,
|
|
"endpoints": ["/", "/info", "/functions", "/functions/<name>", "/strings", "/decompile"],
|
|
"_links": {
|
|
"self": "/",
|
|
"functions": "/functions",
|
|
"strings": "/strings",
|
|
"info": "/info"
|
|
}
|
|
}
|
|
|
|
def handle_info(self):
|
|
if not self.program:
|
|
return {"success": False, "error": "No program loaded"}
|
|
return {
|
|
"success": True,
|
|
"name": self.program.getName(),
|
|
"path": self.program.getExecutablePath(),
|
|
"language": str(self.program.getLanguage().getLanguageID()),
|
|
"processor": str(self.program.getLanguage().getProcessor()),
|
|
"addressSize": self.program.getAddressFactory().getDefaultAddressSpace().getSize(),
|
|
"imageBase": str(self.program.getImageBase()),
|
|
}
|
|
|
|
def handle_functions(self):
|
|
if not self.program:
|
|
return {"success": False, "error": "No program loaded"}
|
|
|
|
functions = []
|
|
fm = self.program.getFunctionManager()
|
|
for func in fm.getFunctions(True): # True = forward iteration
|
|
functions.append({
|
|
"name": func.getName(),
|
|
"address": str(func.getEntryPoint()),
|
|
"signature": str(func.getSignature()),
|
|
})
|
|
if len(functions) >= 10000: # Higher limit for MCP client
|
|
break
|
|
|
|
return {
|
|
"success": True,
|
|
"size": len(functions),
|
|
"result": functions, # MCP client expects "result" key
|
|
}
|
|
|
|
def handle_function_detail(self, path):
|
|
if not self.program:
|
|
return {"success": False, "error": "No program loaded"}
|
|
|
|
# Extract function name or address from path
|
|
parts = path.split("/")
|
|
if len(parts) < 3:
|
|
return {"success": False, "error": "Invalid path"}
|
|
|
|
name_or_addr = parts[2]
|
|
fm = self.program.getFunctionManager()
|
|
|
|
# Try to find by name first
|
|
func = None
|
|
for f in fm.getFunctions(True):
|
|
if f.getName() == name_or_addr:
|
|
func = f
|
|
break
|
|
|
|
# If not found, try by address
|
|
if not func:
|
|
try:
|
|
addr = self.program.getAddressFactory().getAddress(name_or_addr)
|
|
func = fm.getFunctionAt(addr)
|
|
except:
|
|
pass
|
|
|
|
if not func:
|
|
return {"success": False, "error": "Function not found: " + name_or_addr}
|
|
|
|
return {
|
|
"success": True,
|
|
"name": func.getName(),
|
|
"address": str(func.getEntryPoint()),
|
|
"signature": str(func.getSignature()),
|
|
"body": str(func.getBody()),
|
|
"callingConvention": func.getCallingConventionName(),
|
|
"parameterCount": func.getParameterCount(),
|
|
}
|
|
|
|
def handle_strings(self):
|
|
if not self.program:
|
|
return {"success": False, "error": "No program loaded"}
|
|
|
|
strings = []
|
|
listing = self.program.getListing()
|
|
|
|
# Iterate through all defined data and filter for string types
|
|
for data in listing.getDefinedData(True): # True = forward iteration
|
|
try:
|
|
dt = data.getDataType()
|
|
if not dt:
|
|
continue
|
|
|
|
# Check if data type is a string variant
|
|
type_name = dt.getName().lower()
|
|
if "string" in type_name or type_name in ("char", "wchar"):
|
|
value = data.getValue()
|
|
if value:
|
|
str_val = str(value)
|
|
if len(str_val) > 1: # Skip single chars
|
|
strings.append({
|
|
"address": str(data.getAddress()),
|
|
"value": str_val[:200], # Truncate long strings
|
|
"length": len(str_val)
|
|
})
|
|
if len(strings) >= 5000:
|
|
break
|
|
except:
|
|
pass
|
|
|
|
return {
|
|
"success": True,
|
|
"size": len(strings),
|
|
"result": strings, # MCP client expects "result" key
|
|
}
|
|
|
|
def handle_decompile_by_path(self, path):
|
|
"""Handle /functions/{address}/decompile or /functions/by-name/{name}/decompile"""
|
|
if not self.program:
|
|
return {"success": False, "error": "No program loaded"}
|
|
|
|
# Parse address or name from path
|
|
# /functions/000496e8/decompile or /functions/by-name/main/decompile
|
|
parts = path.split("/")
|
|
|
|
func = None
|
|
fm = self.program.getFunctionManager()
|
|
|
|
if "by-name" in path and len(parts) >= 4:
|
|
# /functions/by-name/{name}/decompile
|
|
name = parts[3]
|
|
for f in fm.getFunctions(True):
|
|
if f.getName() == name:
|
|
func = f
|
|
break
|
|
elif len(parts) >= 3:
|
|
# /functions/{address}/decompile
|
|
addr_str = parts[2]
|
|
try:
|
|
addr = self.program.getAddressFactory().getAddress(addr_str)
|
|
func = fm.getFunctionAt(addr)
|
|
except:
|
|
pass
|
|
|
|
if not func:
|
|
return {"success": False, "error": "Function not found from path: " + path}
|
|
|
|
# Decompile
|
|
try:
|
|
result = self.decompiler.decompileFunction(func, 30, getMonitor())
|
|
if result and result.decompileCompleted():
|
|
code = result.getDecompiledFunction().getC()
|
|
return {
|
|
"success": True,
|
|
"result": {
|
|
"name": func.getName(),
|
|
"address": str(func.getEntryPoint()),
|
|
"decompiled_text": code,
|
|
"ccode": code,
|
|
}
|
|
}
|
|
else:
|
|
return {"success": False, "error": "Decompilation failed"}
|
|
except Exception as e:
|
|
return {"success": False, "error": str(e)}
|
|
|
|
def handle_decompile(self, exchange):
|
|
if not self.program:
|
|
return {"success": False, "error": "No program loaded"}
|
|
|
|
# Get function name from query params
|
|
query = exchange.getRequestURI().getQuery()
|
|
if not query:
|
|
return {"success": False, "error": "Missing 'name' or 'address' parameter"}
|
|
|
|
params = {}
|
|
for part in query.split("&"):
|
|
if "=" in part:
|
|
k, v = part.split("=", 1)
|
|
params[k] = v
|
|
|
|
name = params.get("name") or params.get("address")
|
|
if not name:
|
|
return {"success": False, "error": "Missing 'name' or 'address' parameter"}
|
|
|
|
# Find function
|
|
fm = self.program.getFunctionManager()
|
|
func = None
|
|
for f in fm.getFunctions(True):
|
|
if f.getName() == name:
|
|
func = f
|
|
break
|
|
|
|
if not func:
|
|
try:
|
|
addr = self.program.getAddressFactory().getAddress(name)
|
|
func = fm.getFunctionAt(addr)
|
|
except:
|
|
pass
|
|
|
|
if not func:
|
|
return {"success": False, "error": "Function not found: " + name}
|
|
|
|
# Decompile
|
|
try:
|
|
result = self.decompiler.decompileFunction(func, 30, getMonitor())
|
|
if result and result.decompileCompleted():
|
|
code = result.getDecompiledFunction().getC()
|
|
return {
|
|
"success": True,
|
|
"name": func.getName(),
|
|
"address": str(func.getEntryPoint()),
|
|
"decompiled": code
|
|
}
|
|
else:
|
|
return {"success": False, "error": "Decompilation failed"}
|
|
except Exception as e:
|
|
return {"success": False, "error": str(e)}
|
|
|
|
|
|
def run_server(port, program, decompiler):
|
|
"""Start the HTTP server"""
|
|
server = HttpServer.create(InetSocketAddress(port), 0)
|
|
server.createContext("/", GhydraMCPHandler(program, decompiler))
|
|
server.setExecutor(Executors.newCachedThreadPool())
|
|
server.start()
|
|
println("GhydraMCP HTTP server started on port " + str(port))
|
|
return server
|
|
|
|
|
|
# Main script execution
|
|
def main():
|
|
port = DEFAULT_PORT
|
|
|
|
# Parse port from script arguments
|
|
args = getScriptArgs()
|
|
if args and len(args) > 0:
|
|
try:
|
|
port = int(args[0])
|
|
except:
|
|
println("Invalid port number, using default: " + str(DEFAULT_PORT))
|
|
|
|
# Initialize decompiler
|
|
decompiler = DecompInterface()
|
|
decompiler.openProgram(currentProgram)
|
|
|
|
println("=========================================")
|
|
println(" GhydraMCP Headless HTTP Server")
|
|
println("=========================================")
|
|
println(" API Version: " + API_VERSION_STRING + " (compat: " + str(API_VERSION) + ")")
|
|
println(" Port: " + str(port))
|
|
println(" Program: " + (currentProgram.getName() if currentProgram else "None"))
|
|
println(" Script: Python/Jython")
|
|
println("=========================================")
|
|
|
|
server = run_server(port, currentProgram, decompiler)
|
|
|
|
println("")
|
|
println("GhydraMCP Server running. Press Ctrl+C to stop.")
|
|
println("API available at: http://localhost:" + str(port) + "/")
|
|
|
|
# Keep the script running
|
|
import time
|
|
try:
|
|
while True:
|
|
time.sleep(1)
|
|
except KeyboardInterrupt:
|
|
server.stop(0)
|
|
println("Server stopped.")
|
|
|
|
|
|
# Run the main function
|
|
main()
|