mcghidra/docker/GhydraMCPServer.py
Ryan Malloy 4c112a2421 feat(headless): Expand Python server to full API parity
Rewrite GhydraMCPServer.py from 348 to 2138 lines, implementing all 45
routes that the MCP client expects. Previously, most endpoints returned
{"error": "Not found"}, breaking tools like data_list, xrefs_list, and
memory_read.

Key changes:
- Regex-based routing table with method-aware dispatch
- Thread-safe Ghidra transactions via threading.Lock()
- Full read endpoints: functions, data, strings, memory, xrefs, structs
- Full write endpoints: rename, comment, signature, create function/data
- Analysis endpoints: callgraph traversal, dataflow, run analysis
- Jython/Python 2 compatible (no f-strings, type hints, or walrus ops)

Tested with Docker build and curl against all major endpoint groups.
MCP client integration verified working.
2026-01-27 16:23:27 -07:00

2139 lines
81 KiB
Python

# GhydraMCPServer.py - Headless Ghidra script for GhydraMCP HTTP API
# Full API parity with the Java plugin implementation.
# Python 2 / Jython compatible (no f-strings, no readAllBytes).
#
# Usage: analyzeHeadless <project> <name> -import <binary> -postScript GhydraMCPServer.py [port]
#
#@category GhydraMCP
#@keybinding
#@menupath
#@toolbar
# === Java imports ===
from com.sun.net.httpserver import HttpServer, HttpHandler
from java.net import InetSocketAddress, URLDecoder
from java.util.concurrent import Executors
from java.io import OutputStream, BufferedReader, InputStreamReader
# === Ghidra imports ===
from ghidra.app.decompiler import DecompInterface
from ghidra.program.model.symbol import SourceType
from ghidra.program.model.listing import CodeUnit
# === Python imports ===
import json
import re
import threading
import time
# === Constants ===
API_VERSION = 2 # Integer for MCP client compatibility (minimum expected: 2)
API_VERSION_STRING = "2.1"
DEFAULT_PORT = 8192
# === Thread-safe transaction lock ===
_tx_lock = threading.Lock()
# ========================================================================
# Utility Functions
# ========================================================================
def url_decode(s):
"""URL-decode a string using Java's URLDecoder."""
try:
return URLDecoder.decode(s, "UTF-8")
except:
return s
def parse_query_params(exchange):
"""Parse query string from an HttpExchange into a dict."""
params = {}
query = exchange.getRequestURI().getQuery()
if query:
for part in query.split("&"):
if "=" in part:
k, v = part.split("=", 1)
params[k] = url_decode(v)
else:
params[part] = ""
return params
def read_request_body(exchange):
"""Read the request body as a string (Jython-compatible, no readAllBytes)."""
reader = BufferedReader(InputStreamReader(exchange.getRequestBody(), "UTF-8"))
lines = []
line = reader.readLine()
while line is not None:
lines.append(line)
line = reader.readLine()
reader.close()
return "\n".join(lines)
def parse_json_body(exchange):
"""Read and parse JSON body from an HTTP request."""
body = read_request_body(exchange)
if not body or not body.strip():
return {}
try:
return json.loads(body)
except:
return {}
def parse_int(value, default):
"""Safely parse an integer with a default fallback."""
if value is None:
return default
try:
return int(value)
except (ValueError, TypeError):
return default
def bytes_to_hex(byte_array):
"""Convert a Java/Python byte array to a hex string."""
return ''.join(['%02x' % (b & 0xff) for b in byte_array])
def hex_to_bytes(hex_str):
"""Convert a hex string to a list of integer byte values."""
result = []
for i in range(0, len(hex_str), 2):
result.append(int(hex_str[i:i+2], 16))
return result
def make_link(href):
"""Create a HATEOAS link dict."""
return {"href": href}
def with_transaction(program, desc, fn):
"""Execute fn inside a thread-safe Ghidra transaction."""
_tx_lock.acquire()
try:
tx_id = program.startTransaction(desc)
try:
result = fn()
program.endTransaction(tx_id, True)
return result
except:
program.endTransaction(tx_id, False)
raise
finally:
_tx_lock.release()
# ========================================================================
# Data Type Resolution
# ========================================================================
_BUILTIN_TYPES = None
def _init_builtin_types():
"""Lazily initialize the builtin data type map."""
global _BUILTIN_TYPES
if _BUILTIN_TYPES is not None:
return _BUILTIN_TYPES
_BUILTIN_TYPES = {}
try:
from ghidra.program.model.data import (
ByteDataType, WordDataType, DWordDataType, QWordDataType,
FloatDataType, DoubleDataType, CharDataType,
ShortDataType, IntegerDataType, LongDataType, LongLongDataType,
PointerDataType, BooleanDataType,
StringDataType, UnicodeDataType,
Undefined1DataType, Undefined2DataType,
Undefined4DataType, Undefined8DataType
)
pairs = [
("byte", ByteDataType), ("word", WordDataType),
("dword", DWordDataType), ("qword", QWordDataType),
("float", FloatDataType), ("double", DoubleDataType),
("char", CharDataType), ("short", ShortDataType),
("int", IntegerDataType), ("long", LongDataType),
("longlong", LongLongDataType),
("pointer", PointerDataType), ("bool", BooleanDataType),
("boolean", BooleanDataType),
("string", StringDataType), ("unicode", UnicodeDataType),
("undefined1", Undefined1DataType), ("undefined2", Undefined2DataType),
("undefined4", Undefined4DataType), ("undefined8", Undefined8DataType),
]
for name, cls in pairs:
try:
_BUILTIN_TYPES[name] = cls.dataType
except:
pass
except ImportError:
pass
# Also try unsigned variants
try:
from ghidra.program.model.data import (
UnsignedShortDataType, UnsignedIntegerDataType,
UnsignedLongDataType, UnsignedLongLongDataType
)
for name, cls in [
("ushort", UnsignedShortDataType), ("uint", UnsignedIntegerDataType),
("ulong", UnsignedLongDataType), ("ulonglong", UnsignedLongLongDataType),
]:
try:
_BUILTIN_TYPES[name] = cls.dataType
except:
pass
except ImportError:
pass
return _BUILTIN_TYPES
def resolve_data_type(dtm, type_name):
"""Resolve a type name string to a Ghidra DataType object."""
if not type_name:
return None
# Direct path lookup
dt = dtm.getDataType("/" + type_name)
if dt is not None:
return dt
# Builtin types map
builtins = _init_builtin_types()
dt = builtins.get(type_name.lower())
if dt is not None:
return dt
# Search all data types (expensive fallback)
it = dtm.getAllDataTypes()
while it.hasNext():
candidate = it.next()
if candidate.getName().lower() == type_name.lower():
return candidate
return None
# ========================================================================
# Comment type mapping
# ========================================================================
COMMENT_TYPE_MAP = {
"pre": CodeUnit.PRE_COMMENT,
"post": CodeUnit.POST_COMMENT,
"eol": CodeUnit.EOL_COMMENT,
"plate": CodeUnit.PLATE_COMMENT,
"repeatable": CodeUnit.REPEATABLE_COMMENT,
}
# ========================================================================
# Route Table
# ========================================================================
ROUTES = [
# (HTTP method, path regex, handler method name)
# Root / Meta
("GET", r"^/$", "handle_root"),
("GET", r"^/info$", "handle_info"),
("GET", r"^/program$", "handle_program"),
# Headless stubs (cursor-dependent, not available headless)
("GET", r"^/address$", "handle_address_stub"),
("GET", r"^/function$", "handle_function_stub"),
# Functions - by-name variants (must precede address patterns)
("GET", r"^/functions/by-name/([^/]+)/decompile$", "handle_decompile_by_name"),
("GET", r"^/functions/by-name/([^/]+)/disassembly$", "handle_disassembly_by_name"),
("PUT", r"^/functions/by-name/([^/]+)/signature$", "handle_signature_by_name"),
("PATCH", r"^/functions/by-name/([^/]+)$", "handle_patch_function_by_name"),
("GET", r"^/functions/by-name/([^/]+)$", "handle_function_by_name"),
# Functions - by address
("GET", r"^/functions/([^/]+)/decompile$", "handle_decompile"),
("GET", r"^/functions/([^/]+)/disassembly$", "handle_disassembly"),
("GET", r"^/functions/([^/]+)/variables$", "handle_function_variables"),
("PUT", r"^/functions/([^/]+)/signature$", "handle_signature"),
("PATCH", r"^/functions/([^/]+)$", "handle_patch_function"),
("GET", r"^/functions/([^/]+)$", "handle_function_detail"),
# Functions list / create
("GET", r"^/functions/?$", "handle_functions_list"),
("POST", r"^/functions/?$", "handle_create_function"),
# Data
("POST", r"^/data/delete$", "handle_data_delete"),
("POST", r"^/data/type$", "handle_data_type"),
("GET", r"^/data/strings$", "handle_strings"),
("GET", r"^/data/?$", "handle_data_list"),
("POST", r"^/data/?$", "handle_data_create"),
# Strings
("GET", r"^/strings$", "handle_strings"),
# Memory
("POST", r"^/memory/([^/]+)/comments/([^/]+)$", "handle_set_comment"),
("GET", r"^/memory/blocks$", "handle_memory_blocks"),
("GET", r"^/memory$", "handle_memory_read"),
("PATCH", r"^/programs/current/memory/([^/]+)$", "handle_memory_write"),
# Segments
("GET", r"^/segments$", "handle_segments"),
# Symbols
("GET", r"^/symbols/imports$", "handle_imports"),
("GET", r"^/symbols/exports$", "handle_exports"),
("GET", r"^/symbols$", "handle_symbols"),
# Cross-references
("GET", r"^/xrefs$", "handle_xrefs"),
# Classes & Namespaces
("GET", r"^/classes$", "handle_classes"),
("GET", r"^/namespaces$", "handle_namespaces"),
# Structs
("POST", r"^/structs/create$", "handle_struct_create"),
("POST", r"^/structs/addfield$", "handle_struct_addfield"),
("POST", r"^/structs/updatefield$", "handle_struct_updatefield"),
("POST", r"^/structs/delete$", "handle_struct_delete"),
("GET", r"^/structs$", "handle_structs"),
# Analysis
("GET", r"^/analysis/callgraph$", "handle_callgraph"),
("GET", r"^/analysis/dataflow$", "handle_dataflow"),
("GET", r"^/analysis$", "handle_analysis_info"),
("POST", r"^/analysis$", "handle_analysis_run"),
# Legacy compatibility
("GET", r"^/decompile$", "handle_decompile_legacy"),
]
# ========================================================================
# HTTP Handler
# ========================================================================
class GhydraMCPHandler(HttpHandler):
def __init__(self, program, decompiler):
self.program = program
self.decompiler = decompiler
# Pre-compile route patterns
self.routes = []
for method, pattern, handler_name in ROUTES:
self.routes.append((method, re.compile(pattern), handler_name))
# ------------------------------------------------------------------
# Dispatch
# ------------------------------------------------------------------
def handle(self, exchange):
try:
method = exchange.getRequestMethod()
path = exchange.getRequestURI().getPath()
# CORS preflight
if method == "OPTIONS":
self._send_cors_preflight(exchange)
return
# Match routes
for route_method, pattern, handler_name in self.routes:
if method != route_method:
continue
match = pattern.match(path)
if match:
handler = getattr(self, handler_name)
groups = match.groups()
decoded = tuple(url_decode(g) for g in groups)
result = handler(exchange, *decoded)
if isinstance(result, tuple):
response, code = result
self._send_response(exchange, code, response)
else:
self._send_response(exchange, 200, result)
return
# No route matched
self._send_response(exchange, 404, {
"success": False,
"error": {"code": "NOT_FOUND", "message": "Endpoint not found: %s %s" % (method, path)}
})
except Exception as e:
try:
self._send_response(exchange, 500, {
"success": False,
"error": {"code": "INTERNAL_ERROR", "message": str(e)}
})
except:
pass
def _send_response(self, exchange, code, data):
response_bytes = json.dumps(data, indent=2).encode('utf-8')
headers = exchange.getResponseHeaders()
headers.set("Content-Type", "application/json; charset=utf-8")
headers.set("Access-Control-Allow-Origin", "*")
headers.set("Access-Control-Allow-Methods", "GET, POST, PATCH, PUT, DELETE, OPTIONS")
headers.set("Access-Control-Allow-Headers", "Content-Type, X-Request-ID")
exchange.sendResponseHeaders(code, len(response_bytes))
os = exchange.getResponseBody()
os.write(response_bytes)
os.close()
def _send_cors_preflight(self, exchange):
headers = exchange.getResponseHeaders()
headers.set("Access-Control-Allow-Origin", "*")
headers.set("Access-Control-Allow-Methods", "GET, POST, PATCH, PUT, DELETE, OPTIONS")
headers.set("Access-Control-Allow-Headers", "Content-Type, X-Request-ID")
headers.set("Access-Control-Max-Age", "86400")
exchange.sendResponseHeaders(204, -1)
exchange.getResponseBody().close()
# ------------------------------------------------------------------
# Internal helpers
# ------------------------------------------------------------------
def _no_program(self):
return {"success": False, "error": {"code": "NO_PROGRAM_LOADED", "message": "No program loaded"}}
def _find_function_at(self, addr_str):
"""Find a function by address string. Falls back to containing function."""
try:
addr = self.program.getAddressFactory().getAddress(addr_str)
fm = self.program.getFunctionManager()
func = fm.getFunctionAt(addr)
if func is None:
func = fm.getFunctionContaining(addr)
return func
except:
return None
def _find_function_by_name(self, name):
"""Find a function by exact name match."""
fm = self.program.getFunctionManager()
for func in fm.getFunctions(True):
if func.getName() == name:
return func
return None
def _get_function(self, identifier, by_name=False):
"""Resolve a function by name or address."""
if by_name:
return self._find_function_by_name(identifier)
return self._find_function_at(identifier)
def _decompile_function(self, func):
"""Decompile a function and return the result dict."""
result = self.decompiler.decompileFunction(func, 60, getMonitor())
resp = {
"name": func.getName(),
"address": str(func.getEntryPoint()),
}
if result and result.decompileCompleted():
decomp = result.getDecompiledFunction()
code = decomp.getC()
resp["decompiled"] = code
resp["decompiled_text"] = code # alias for MCP client compat
resp["ccode"] = code # alias for API spec compat
resp["signature"] = decomp.getSignature()
else:
msg = result.getErrorMessage() if result else "Unknown error"
resp["error"] = "Decompilation failed: %s" % msg
addr = str(func.getEntryPoint())
resp["_links"] = {
"self": make_link("/functions/%s/decompile" % addr),
"function": make_link("/functions/%s" % addr),
"disassembly": make_link("/functions/%s/disassembly" % addr),
}
return {"success": True, "result": resp}
def _build_disassembly(self, func):
"""Get disassembly instructions for a function."""
instructions = []
listing = self.program.getListing()
body = func.getBody()
inst_iter = listing.getInstructions(body, True)
while inst_iter.hasNext():
instr = inst_iter.next()
full_repr = str(instr)
mnemonic = instr.getMnemonicString()
operands = full_repr[len(mnemonic):].strip() if len(full_repr) > len(mnemonic) else ""
instructions.append({
"address": str(instr.getAddress()),
"mnemonic": mnemonic,
"operands": operands,
"bytes": bytes_to_hex(instr.getBytes()),
})
addr = str(func.getEntryPoint())
# Build text representation for MCP client
lines = []
for inst in instructions:
lines.append("%s %s %s" % (inst["address"], inst["mnemonic"], inst["operands"]))
disassembly_text = "\n".join(lines)
return {
"success": True,
"result": {
"name": func.getName(),
"address": addr,
"instructions": instructions,
"instructionCount": len(instructions),
"disassembly_text": disassembly_text,
"_links": {
"self": make_link("/functions/%s/disassembly" % addr),
"function": make_link("/functions/%s" % addr),
"decompile": make_link("/functions/%s/decompile" % addr),
},
}
}
def _build_function_info(self, func):
"""Build a detailed function info dict."""
addr = str(func.getEntryPoint())
info = {
"name": func.getName(),
"address": addr,
"signature": str(func.getSignature()),
"returnType": func.getReturnType().getName(),
"parameterCount": func.getParameterCount(),
"callingConvention": func.getCallingConventionName(),
"isThunk": func.isThunk(),
"isExternal": func.isExternal(),
"bodySize": func.getBody().getNumAddresses(),
}
# Parameters
params = []
for param in func.getParameters():
params.append({
"name": param.getName(),
"type": param.getDataType().getName(),
"ordinal": param.getOrdinal(),
})
info["parameters"] = params
# Comments
entry = func.getEntryPoint()
listing = self.program.getListing()
cu = listing.getCodeUnitAt(entry)
if cu:
for ctype, cname in [
(CodeUnit.PRE_COMMENT, "preComment"),
(CodeUnit.POST_COMMENT, "postComment"),
(CodeUnit.EOL_COMMENT, "eolComment"),
(CodeUnit.PLATE_COMMENT, "plateComment"),
]:
c = cu.getComment(ctype)
if c:
info[cname] = c
func_comment = func.getComment()
if func_comment:
info["comment"] = func_comment
# HATEOAS links
info["_links"] = {
"self": make_link("/functions/%s" % addr),
"decompile": make_link("/functions/%s/decompile" % addr),
"disassembly": make_link("/functions/%s/disassembly" % addr),
"variables": make_link("/functions/%s/variables" % addr),
"xrefs_to": make_link("/xrefs?to_addr=%s" % addr),
"xrefs_from": make_link("/xrefs?from_addr=%s" % addr),
}
return info
def _build_xref_info(self, ref):
"""Build a cross-reference info dict."""
info = {
"fromAddress": str(ref.getFromAddress()),
"toAddress": str(ref.getToAddress()),
"type": self._get_ref_type_name(ref.getReferenceType()),
"isPrimary": ref.isPrimary(),
}
fm = self.program.getFunctionManager()
from_func = fm.getFunctionContaining(ref.getFromAddress())
to_func = fm.getFunctionContaining(ref.getToAddress())
if from_func:
info["fromFunction"] = from_func.getName()
if to_func:
info["toFunction"] = to_func.getName()
return info
def _get_ref_type_name(self, ref_type):
if ref_type.isCall():
return "CALL"
if ref_type.isData():
return "DATA"
if ref_type.isRead():
return "READ"
if ref_type.isWrite():
return "WRITE"
if ref_type.isJump():
return "JUMP"
return str(ref_type)
def _find_struct(self, name):
"""Find a struct/union data type by name."""
dtm = self.program.getDataTypeManager()
it = dtm.getAllDataTypes()
while it.hasNext():
dt = it.next()
if dt.getName() == name:
from ghidra.program.model.data import Structure, Union
if isinstance(dt, (Structure, Union)):
return dt
return None
# ==================================================================
# Root / Meta Handlers
# ==================================================================
def handle_root(self, exchange):
result = {
"success": True,
"api_version": API_VERSION,
"api_version_string": API_VERSION_STRING,
"message": "GhydraMCP Headless API",
"mode": "headless",
}
if self.program:
result["program"] = self.program.getName()
result["file"] = self.program.getExecutablePath()
result["language"] = str(self.program.getLanguageID())
result["_links"] = {
"self": make_link("/"),
"info": make_link("/info"),
"program": make_link("/program"),
"functions": make_link("/functions"),
"data": make_link("/data"),
"strings": make_link("/strings"),
"memory": make_link("/memory"),
"segments": make_link("/segments"),
"symbols": make_link("/symbols"),
"xrefs": make_link("/xrefs"),
"classes": make_link("/classes"),
"namespaces": make_link("/namespaces"),
"structs": make_link("/structs"),
"analysis": make_link("/analysis"),
}
return result
def handle_info(self, exchange):
if not self.program:
return self._no_program()
mem = self.program.getMemory()
result = {
"success": True,
"result": {
"name": self.program.getName(),
"path": self.program.getExecutablePath(),
"language": str(self.program.getLanguageID()),
"processor": str(self.program.getLanguage().getProcessor()),
"addressSize": self.program.getAddressFactory().getDefaultAddressSpace().getSize(),
"imageBase": str(self.program.getImageBase()),
"minAddress": str(mem.getMinAddress()),
"maxAddress": str(mem.getMaxAddress()),
"memorySize": mem.getSize(),
"functionCount": self.program.getFunctionManager().getFunctionCount(),
"mode": "headless",
"_links": {
"self": make_link("/info"),
"program": make_link("/program"),
"functions": make_link("/functions"),
},
}
}
return result
def handle_program(self, exchange):
if not self.program:
return self._no_program()
mem = self.program.getMemory()
fm = self.program.getFunctionManager()
result = {
"name": self.program.getName(),
"path": self.program.getExecutablePath(),
"language": str(self.program.getLanguageID()),
"compiler": str(self.program.getCompilerSpec().getCompilerSpecID()),
"processor": str(self.program.getLanguage().getProcessor()),
"addressSize": self.program.getAddressFactory().getDefaultAddressSpace().getSize(),
"imageBase": str(self.program.getImageBase()),
"minAddress": str(mem.getMinAddress()),
"maxAddress": str(mem.getMaxAddress()),
"memorySize": mem.getSize(),
"functionCount": fm.getFunctionCount(),
"_links": {
"self": make_link("/program"),
"functions": make_link("/functions"),
"symbols": make_link("/symbols"),
"segments": make_link("/segments"),
"analysis": make_link("/analysis"),
},
}
return {"success": True, "result": result}
# ==================================================================
# Headless Stubs
# ==================================================================
def handle_address_stub(self, exchange):
return {
"success": False,
"error": {
"code": "HEADLESS_MODE",
"message": "Current address is not available in headless mode. Use /functions or /data with address parameters instead."
}
}
def handle_function_stub(self, exchange):
return {
"success": False,
"error": {
"code": "HEADLESS_MODE",
"message": "Current function is not available in headless mode. Use /functions/{address} instead."
}
}
# ==================================================================
# Function Handlers
# ==================================================================
def handle_functions_list(self, exchange):
if not self.program:
return self._no_program()
params = parse_query_params(exchange)
limit = parse_int(params.get("limit"), 100)
offset = parse_int(params.get("offset"), 0)
name_filter = params.get("name")
functions = []
fm = self.program.getFunctionManager()
total = fm.getFunctionCount()
count = 0
skipped = 0
for func in fm.getFunctions(True):
if count >= limit:
break
# Apply name filter
if name_filter and name_filter.lower() not in func.getName().lower():
continue
if skipped < offset:
skipped += 1
continue
addr = str(func.getEntryPoint())
functions.append({
"name": func.getName(),
"address": addr,
"signature": str(func.getSignature()),
"parameterCount": func.getParameterCount(),
"isThunk": func.isThunk(),
"_links": {
"self": make_link("/functions/%s" % addr),
"decompile": make_link("/functions/%s/decompile" % addr),
"disassembly": make_link("/functions/%s/disassembly" % addr),
},
})
count += 1
result = {
"success": True,
"result": functions,
"size": total,
"offset": offset,
"limit": limit,
"_links": {
"self": make_link("/functions?offset=%d&limit=%d" % (offset, limit)),
},
}
if offset + count < total:
result["_links"]["next"] = make_link("/functions?offset=%d&limit=%d" % (offset + limit, limit))
if offset > 0:
result["_links"]["prev"] = make_link("/functions?offset=%d&limit=%d" % (max(0, offset - limit), limit))
return result
def handle_function_detail(self, exchange, addr_str):
if not self.program:
return self._no_program()
func = self._find_function_at(addr_str)
if not func:
return {"success": False, "error": {"code": "FUNCTION_NOT_FOUND", "message": "No function at address: %s" % addr_str}}
return {"success": True, "result": self._build_function_info(func)}
def handle_function_by_name(self, exchange, name):
if not self.program:
return self._no_program()
func = self._find_function_by_name(name)
if not func:
return {"success": False, "error": {"code": "FUNCTION_NOT_FOUND", "message": "Function not found: %s" % name}}
return {"success": True, "result": self._build_function_info(func)}
# -- Decompile --
def handle_decompile(self, exchange, addr_str):
if not self.program:
return self._no_program()
func = self._find_function_at(addr_str)
if not func:
return {"success": False, "error": {"code": "FUNCTION_NOT_FOUND", "message": "No function at address: %s" % addr_str}}
return self._decompile_function(func)
def handle_decompile_by_name(self, exchange, name):
if not self.program:
return self._no_program()
func = self._find_function_by_name(name)
if not func:
return {"success": False, "error": {"code": "FUNCTION_NOT_FOUND", "message": "Function not found: %s" % name}}
return self._decompile_function(func)
# -- Disassembly --
def handle_disassembly(self, exchange, addr_str):
if not self.program:
return self._no_program()
func = self._find_function_at(addr_str)
if not func:
return {"success": False, "error": {"code": "FUNCTION_NOT_FOUND", "message": "No function at address: %s" % addr_str}}
return self._build_disassembly(func)
def handle_disassembly_by_name(self, exchange, name):
if not self.program:
return self._no_program()
func = self._find_function_by_name(name)
if not func:
return {"success": False, "error": {"code": "FUNCTION_NOT_FOUND", "message": "Function not found: %s" % name}}
return self._build_disassembly(func)
# -- Variables --
def handle_function_variables(self, exchange, addr_str):
if not self.program:
return self._no_program()
func = self._find_function_at(addr_str)
if not func:
return {"success": False, "error": {"code": "FUNCTION_NOT_FOUND", "message": "No function at address: %s" % addr_str}}
variables = []
# Parameters
for param in func.getParameters():
variables.append({
"name": param.getName(),
"type": param.getDataType().getName(),
"storage": str(param.getVariableStorage()),
"kind": "parameter",
"ordinal": param.getOrdinal(),
})
# Local variables
for local in func.getLocalVariables():
var_info = {
"name": local.getName(),
"type": local.getDataType().getName(),
"storage": str(local.getVariableStorage()),
"kind": "local",
}
try:
so = local.getStackOffset()
if so != 0:
var_info["stackOffset"] = so
except:
pass
variables.append(var_info)
addr = str(func.getEntryPoint())
return {
"success": True,
"result": {
"name": func.getName(),
"address": addr,
"variables": variables,
"parameterCount": func.getParameterCount(),
"localVariableCount": len(func.getLocalVariables()),
}
}
# -- Patch (rename / comment) --
def _handle_patch_function_impl(self, exchange, identifier, by_name):
if not self.program:
return self._no_program()
func = self._get_function(identifier, by_name)
if not func:
return {"success": False, "error": {"code": "FUNCTION_NOT_FOUND", "message": "Function not found: %s" % identifier}}
body = parse_json_body(exchange)
new_name = body.get("name")
comment = body.get("comment")
if not new_name and comment is None:
return {"success": False, "error": {"code": "MISSING_PARAMETER", "message": "Provide 'name' and/or 'comment' in request body"}}
old_name = func.getName()
addr = str(func.getEntryPoint())
result = {"address": addr, "oldName": old_name}
def do_patch():
if new_name:
func.setName(new_name, SourceType.USER_DEFINED)
result["newName"] = new_name
if comment is not None:
func.setComment(comment)
result["comment"] = comment
with_transaction(self.program, "Patch function", do_patch)
result["message"] = "Function updated successfully"
return {"success": True, "result": result}
def handle_patch_function(self, exchange, addr_str):
return self._handle_patch_function_impl(exchange, addr_str, by_name=False)
def handle_patch_function_by_name(self, exchange, name):
return self._handle_patch_function_impl(exchange, name, by_name=True)
# -- Create function --
def handle_create_function(self, exchange):
if not self.program:
return self._no_program()
body = parse_json_body(exchange)
addr_str = body.get("address")
if not addr_str:
return {"success": False, "error": {"code": "MISSING_PARAMETER", "message": "Missing 'address' in request body"}}
try:
addr = self.program.getAddressFactory().getAddress(addr_str)
except:
return {"success": False, "error": {"code": "INVALID_ADDRESS", "message": "Invalid address: %s" % addr_str}}
result_holder = [None]
def do_create():
fm = self.program.getFunctionManager()
func = fm.createFunction(None, addr, None, SourceType.USER_DEFINED)
result_holder[0] = func
try:
with_transaction(self.program, "Create function", do_create)
func = result_holder[0]
if func:
return ({"success": True, "result": {
"name": func.getName(),
"address": str(func.getEntryPoint()),
"message": "Function created successfully",
}}, 201)
return {"success": False, "error": {"code": "CREATE_FAILED", "message": "Failed to create function at %s" % addr_str}}
except Exception as e:
return {"success": False, "error": {"code": "CREATE_ERROR", "message": str(e)}}
# -- Signature --
def _handle_signature_impl(self, exchange, identifier, by_name):
if not self.program:
return self._no_program()
func = self._get_function(identifier, by_name)
if not func:
return {"success": False, "error": {"code": "FUNCTION_NOT_FOUND", "message": "Function not found: %s" % identifier}}
body = parse_json_body(exchange)
sig_str = body.get("signature")
if not sig_str:
return {"success": False, "error": {"code": "MISSING_PARAMETER", "message": "Missing 'signature' in request body"}}
addr = str(func.getEntryPoint())
try:
from ghidra.app.util.cparser.C import CParser
dtm = self.program.getDataTypeManager()
parser = CParser(dtm)
parsed = parser.parse(sig_str + ";")
def do_set_sig():
from ghidra.program.model.data import FunctionDefinitionDataType
if isinstance(parsed, FunctionDefinitionDataType):
from ghidra.app.cmd.function import ApplyFunctionSignatureCmd
cmd = ApplyFunctionSignatureCmd(func.getEntryPoint(), parsed, SourceType.USER_DEFINED)
cmd.applyTo(self.program, getMonitor())
with_transaction(self.program, "Set function signature", do_set_sig)
return {"success": True, "result": {
"address": addr,
"signature": sig_str,
"message": "Signature updated",
}}
except Exception as e:
# Fallback: try just setting name from signature
return {"success": False, "error": {"code": "SIGNATURE_ERROR", "message": "Could not parse signature: %s" % str(e)}}
def handle_signature(self, exchange, addr_str):
return self._handle_signature_impl(exchange, addr_str, by_name=False)
def handle_signature_by_name(self, exchange, name):
return self._handle_signature_impl(exchange, name, by_name=True)
# ==================================================================
# Data Handlers
# ==================================================================
def handle_data_list(self, exchange):
if not self.program:
return self._no_program()
params = parse_query_params(exchange)
limit = parse_int(params.get("limit"), 100)
offset = parse_int(params.get("offset"), 0)
addr_filter = params.get("addr")
name_filter = params.get("name")
name_contains = params.get("name_contains")
type_filter = params.get("type")
# Single address lookup
if addr_filter:
try:
addr = self.program.getAddressFactory().getAddress(addr_filter)
data = self.program.getListing().getDataAt(addr)
if data:
value = data.getDefaultValueRepresentation()
if value and len(value) > 200:
value = value[:200] + "..."
item = {
"address": str(data.getAddress()),
"type": data.getDataType().getName(),
"length": data.getLength(),
"value": value,
}
sym = self.program.getSymbolTable().getPrimarySymbol(addr)
if sym:
item["name"] = sym.getName()
return {"success": True, "result": [item], "size": 1, "offset": 0, "limit": 1}
return {"success": True, "result": [], "size": 0, "offset": 0, "limit": limit}
except:
return {"success": True, "result": [], "size": 0, "offset": 0, "limit": limit}
data_items = []
listing = self.program.getListing()
st = self.program.getSymbolTable()
count = 0
skipped = 0
for data in listing.getDefinedData(True):
if count >= limit:
break
# Type filter
if type_filter and type_filter.lower() not in data.getDataType().getName().lower():
continue
# Name filters (require symbol lookup)
if name_filter or name_contains:
sym = st.getPrimarySymbol(data.getAddress())
sym_name = sym.getName() if sym else ""
if name_filter and sym_name != name_filter:
continue
if name_contains and name_contains.lower() not in sym_name.lower():
continue
if skipped < offset:
skipped += 1
continue
value = data.getDefaultValueRepresentation()
if value and len(value) > 200:
value = value[:200] + "..."
item = {
"address": str(data.getAddress()),
"type": data.getDataType().getName(),
"length": data.getLength(),
"value": value,
}
sym = st.getPrimarySymbol(data.getAddress())
if sym:
item["name"] = sym.getName()
item["_links"] = {"self": make_link("/data/%s" % str(data.getAddress()))}
data_items.append(item)
count += 1
return {
"success": True,
"result": data_items,
"offset": offset,
"limit": limit,
}
def handle_data_create(self, exchange):
if not self.program:
return self._no_program()
body = parse_json_body(exchange)
addr_str = body.get("address")
if not addr_str:
return {"success": False, "error": {"code": "MISSING_PARAMETER", "message": "Missing 'address'"}}
try:
addr = self.program.getAddressFactory().getAddress(addr_str)
except:
return {"success": False, "error": {"code": "INVALID_ADDRESS", "message": "Invalid address: %s" % addr_str}}
# Label creation (newName field)
new_name = body.get("newName")
if new_name:
def do_label():
self.program.getSymbolTable().createLabel(addr, new_name, SourceType.USER_DEFINED)
try:
with_transaction(self.program, "Create label", do_label)
return {"success": True, "result": {"address": addr_str, "name": new_name, "message": "Label created"}}
except Exception as e:
return {"success": False, "error": {"code": "LABEL_ERROR", "message": str(e)}}
# Data creation (type field)
type_name = body.get("type")
if not type_name:
return {"success": False, "error": {"code": "MISSING_PARAMETER", "message": "Missing 'type' or 'newName'"}}
dtm = self.program.getDataTypeManager()
dt = resolve_data_type(dtm, type_name)
if not dt:
return {"success": False, "error": {"code": "UNKNOWN_TYPE", "message": "Cannot resolve type: %s" % type_name}}
def do_create_data():
from ghidra.program.model.data import DataUtilities
DataUtilities.createData(
self.program, addr, dt, -1, False,
DataUtilities.ClearDataMode.CLEAR_ALL_UNDEFINED_CONFLICT_DATA
)
try:
with_transaction(self.program, "Create data", do_create_data)
return ({"success": True, "result": {"address": addr_str, "type": type_name, "message": "Data created"}}, 201)
except Exception as e:
return {"success": False, "error": {"code": "DATA_ERROR", "message": str(e)}}
def handle_data_delete(self, exchange):
if not self.program:
return self._no_program()
body = parse_json_body(exchange)
addr_str = body.get("address")
if not addr_str:
return {"success": False, "error": {"code": "MISSING_PARAMETER", "message": "Missing 'address'"}}
try:
addr = self.program.getAddressFactory().getAddress(addr_str)
except:
return {"success": False, "error": {"code": "INVALID_ADDRESS", "message": "Invalid address: %s" % addr_str}}
def do_delete():
self.program.getListing().clearCodeUnits(addr, addr, False)
try:
with_transaction(self.program, "Delete data", do_delete)
return {"success": True, "result": {"address": addr_str, "message": "Data deleted"}}
except Exception as e:
return {"success": False, "error": {"code": "DELETE_ERROR", "message": str(e)}}
def handle_data_type(self, exchange):
"""Change data type at an address (clear + recreate)."""
if not self.program:
return self._no_program()
body = parse_json_body(exchange)
addr_str = body.get("address")
type_name = body.get("type")
if not addr_str or not type_name:
return {"success": False, "error": {"code": "MISSING_PARAMETER", "message": "Missing 'address' and/or 'type'"}}
try:
addr = self.program.getAddressFactory().getAddress(addr_str)
except:
return {"success": False, "error": {"code": "INVALID_ADDRESS", "message": "Invalid address: %s" % addr_str}}
dtm = self.program.getDataTypeManager()
dt = resolve_data_type(dtm, type_name)
if not dt:
return {"success": False, "error": {"code": "UNKNOWN_TYPE", "message": "Cannot resolve type: %s" % type_name}}
def do_retype():
from ghidra.program.model.data import DataUtilities
self.program.getListing().clearCodeUnits(addr, addr, False)
DataUtilities.createData(
self.program, addr, dt, -1, False,
DataUtilities.ClearDataMode.CLEAR_ALL_UNDEFINED_CONFLICT_DATA
)
try:
with_transaction(self.program, "Change data type", do_retype)
return {"success": True, "result": {"address": addr_str, "type": type_name, "message": "Data type changed"}}
except Exception as e:
return {"success": False, "error": {"code": "TYPE_ERROR", "message": str(e)}}
# ==================================================================
# Strings Handler
# ==================================================================
def handle_strings(self, exchange):
if not self.program:
return self._no_program()
params = parse_query_params(exchange)
limit = parse_int(params.get("limit"), 2000)
offset = parse_int(params.get("offset"), 0)
filter_str = params.get("filter")
min_length = parse_int(params.get("min_length"), 2)
strings = []
listing = self.program.getListing()
ref_mgr = self.program.getReferenceManager()
count = 0
skipped = 0
for data in listing.getDefinedData(True):
if count >= limit:
break
try:
dt = data.getDataType()
if not dt:
continue
type_name = dt.getName().lower()
if "string" not in type_name and type_name not in ("char", "wchar"):
continue
value = data.getValue()
if not value:
continue
str_val = str(value)
if len(str_val) < min_length:
continue
if filter_str and filter_str.lower() not in str_val.lower():
continue
if skipped < offset:
skipped += 1
continue
item = {
"address": str(data.getAddress()),
"value": str_val[:200],
"length": len(str_val),
"type": dt.getName(),
}
# Xref count
ref_count = 0
refs = ref_mgr.getReferencesTo(data.getAddress())
while refs.hasNext():
refs.next()
ref_count += 1
item["xrefCount"] = ref_count
# Label name
sym = self.program.getSymbolTable().getPrimarySymbol(data.getAddress())
if sym:
item["name"] = sym.getName()
strings.append(item)
count += 1
except:
pass
return {
"success": True,
"result": strings,
"size": len(strings),
"offset": offset,
"limit": limit,
}
# ==================================================================
# Memory Handlers
# ==================================================================
def handle_memory_read(self, exchange):
if not self.program:
return self._no_program()
params = parse_query_params(exchange)
addr_str = params.get("address")
if not addr_str:
return {"success": False, "error": {"code": "MISSING_PARAMETER", "message": "Missing 'address' parameter"}}
length = parse_int(params.get("length"), 16)
fmt = params.get("format", "hex")
if length <= 0 or length > 4096:
return {"success": False, "error": {"code": "INVALID_PARAMETER", "message": "Length must be 1-4096"}}
try:
addr = self.program.getAddressFactory().getAddress(addr_str)
mem = self.program.getMemory()
from jarray import zeros
byte_array = zeros(length, 'b')
bytes_read = mem.getBytes(addr, byte_array)
if fmt == "base64":
from java.util import Base64
# Convert to unsigned bytes for Base64
import array as pyarray
unsigned = bytearray([(b & 0xff) for b in byte_array[:bytes_read]])
formatted = Base64.getEncoder().encodeToString(unsigned)
elif fmt == "string":
chars = []
for i in range(bytes_read):
b = byte_array[i] & 0xff
if 32 <= b < 127:
chars.append(chr(b))
else:
chars.append('.')
formatted = ''.join(chars)
else:
formatted = bytes_to_hex(byte_array[:bytes_read])
return {
"success": True,
"result": {
"address": str(addr),
"bytesRead": bytes_read,
"format": fmt,
"bytes": formatted,
"hexBytes": bytes_to_hex(byte_array[:bytes_read]),
"rawBytes": bytes_to_hex(byte_array[:bytes_read]),
}
}
except Exception as e:
return {"success": False, "error": {"code": "MEMORY_ERROR", "message": str(e)}}
def handle_memory_write(self, exchange, addr_str):
if not self.program:
return self._no_program()
body = parse_json_body(exchange)
bytes_str = body.get("bytes")
fmt = body.get("format", "hex")
if not bytes_str:
return {"success": False, "error": {"code": "MISSING_PARAMETER", "message": "Missing 'bytes'"}}
try:
addr = self.program.getAddressFactory().getAddress(addr_str)
if fmt == "base64":
from java.util import Base64
byte_vals = list(Base64.getDecoder().decode(bytes_str))
elif fmt == "string":
byte_vals = [ord(c) for c in bytes_str]
else:
byte_vals = hex_to_bytes(bytes_str)
# Convert to Java byte array
from jarray import zeros
java_bytes = zeros(len(byte_vals), 'b')
for i, b in enumerate(byte_vals):
java_bytes[i] = b if b <= 127 else b - 256
def do_write():
self.program.getMemory().setBytes(addr, java_bytes)
with_transaction(self.program, "Write memory", do_write)
return {"success": True, "result": {"address": str(addr), "bytesWritten": len(byte_vals)}}
except Exception as e:
return {"success": False, "error": {"code": "MEMORY_ERROR", "message": str(e)}}
def handle_memory_blocks(self, exchange):
"""List memory blocks (alias for /segments)."""
return self.handle_segments(exchange)
def handle_set_comment(self, exchange, addr_str, comment_type):
"""Set a comment at a specific address."""
if not self.program:
return self._no_program()
body = parse_json_body(exchange)
comment_text = body.get("comment", "")
ct = COMMENT_TYPE_MAP.get(comment_type.lower())
if ct is None:
return {"success": False, "error": {
"code": "INVALID_COMMENT_TYPE",
"message": "Invalid comment type: %s. Use: pre, post, eol, plate, repeatable" % comment_type
}}
try:
addr = self.program.getAddressFactory().getAddress(addr_str)
listing = self.program.getListing()
cu = listing.getCodeUnitAt(addr)
if not cu:
cu = listing.getCodeUnitContaining(addr)
if not cu:
return {"success": False, "error": {"code": "NO_CODE_UNIT", "message": "No code unit at address: %s" % addr_str}}
def do_comment():
cu.setComment(ct, comment_text if comment_text else None)
with_transaction(self.program, "Set comment", do_comment)
return {"success": True, "result": {
"address": addr_str,
"commentType": comment_type,
"comment": comment_text,
"message": "Comment set successfully",
}}
except Exception as e:
return {"success": False, "error": {"code": "COMMENT_ERROR", "message": str(e)}}
# ==================================================================
# Segments Handler
# ==================================================================
def handle_segments(self, exchange):
if not self.program:
return self._no_program()
params = parse_query_params(exchange)
name_filter = params.get("name")
segments = []
for block in self.program.getMemory().getBlocks():
if name_filter and name_filter not in block.getName():
continue
segments.append({
"name": block.getName(),
"start": str(block.getStart()),
"end": str(block.getEnd()),
"size": block.getSize(),
"readable": block.isRead(),
"writable": block.isWrite(),
"executable": block.isExecute(),
"initialized": block.isInitialized(),
"_links": {
"self": make_link("/segments/%s" % block.getName()),
"memory": make_link("/memory?address=%s&length=1024" % str(block.getStart())),
},
})
return {"success": True, "result": segments}
# ==================================================================
# Symbol Handlers
# ==================================================================
def handle_symbols(self, exchange):
if not self.program:
return self._no_program()
params = parse_query_params(exchange)
limit = parse_int(params.get("limit"), 100)
offset = parse_int(params.get("offset"), 0)
name_filter = params.get("name")
type_filter = params.get("type")
symbols = []
st = self.program.getSymbolTable()
count = 0
skipped = 0
for symbol in st.getAllSymbols(True):
if count >= limit:
break
if name_filter and name_filter.lower() not in symbol.getName().lower():
continue
if type_filter and str(symbol.getSymbolType()).lower() != type_filter.lower():
continue
if skipped < offset:
skipped += 1
continue
symbols.append({
"name": symbol.getName(),
"address": str(symbol.getAddress()),
"namespace": symbol.getParentNamespace().getName(),
"type": str(symbol.getSymbolType()),
"isPrimary": symbol.isPrimary(),
"isExternal": symbol.isExternal(),
})
count += 1
return {
"success": True,
"result": symbols,
"offset": offset,
"limit": limit,
"_links": {
"self": make_link("/symbols"),
"imports": make_link("/symbols/imports"),
"exports": make_link("/symbols/exports"),
},
}
def handle_imports(self, exchange):
if not self.program:
return self._no_program()
params = parse_query_params(exchange)
limit = parse_int(params.get("limit"), 100)
offset = parse_int(params.get("offset"), 0)
imports = []
count = 0
skipped = 0
for symbol in self.program.getSymbolTable().getExternalSymbols():
if count >= limit:
break
if skipped < offset:
skipped += 1
continue
imports.append({
"name": symbol.getName(),
"address": str(symbol.getAddress()),
"namespace": symbol.getParentNamespace().getName(),
})
count += 1
return {"success": True, "result": imports, "offset": offset, "limit": limit}
def handle_exports(self, exchange):
if not self.program:
return self._no_program()
params = parse_query_params(exchange)
limit = parse_int(params.get("limit"), 100)
offset = parse_int(params.get("offset"), 0)
exports = []
count = 0
skipped = 0
for symbol in self.program.getSymbolTable().getAllSymbols(True):
if count >= limit:
break
if not symbol.isExternalEntryPoint():
continue
if skipped < offset:
skipped += 1
continue
exports.append({
"name": symbol.getName(),
"address": str(symbol.getAddress()),
})
count += 1
return {"success": True, "result": exports, "offset": offset, "limit": limit}
# ==================================================================
# Cross-References Handler
# ==================================================================
def handle_xrefs(self, exchange):
if not self.program:
return self._no_program()
params = parse_query_params(exchange)
to_addr_str = params.get("to_addr")
from_addr_str = params.get("from_addr")
type_filter = params.get("type")
limit = parse_int(params.get("limit"), 100)
offset = parse_int(params.get("offset"), 0)
if not to_addr_str and not from_addr_str:
return {"success": False, "error": {
"code": "MISSING_PARAMETER",
"message": "Either 'to_addr' or 'from_addr' parameter is required"
}}
xrefs = []
ref_mgr = self.program.getReferenceManager()
try:
if to_addr_str:
addr = self.program.getAddressFactory().getAddress(to_addr_str)
refs = ref_mgr.getReferencesTo(addr)
count = 0
skipped = 0
while refs.hasNext() and count < limit:
ref = refs.next()
if type_filter and self._get_ref_type_name(ref.getReferenceType()).lower() != type_filter.lower():
continue
if skipped < offset:
skipped += 1
continue
xrefs.append(self._build_xref_info(ref))
count += 1
if from_addr_str:
addr = self.program.getAddressFactory().getAddress(from_addr_str)
refs = ref_mgr.getReferencesFrom(addr)
count = 0
skipped = 0
for ref in refs:
if count >= limit:
break
if type_filter and self._get_ref_type_name(ref.getReferenceType()).lower() != type_filter.lower():
continue
if skipped < offset:
skipped += 1
continue
xrefs.append(self._build_xref_info(ref))
count += 1
return {"success": True, "result": xrefs, "offset": offset, "limit": limit}
except Exception as e:
return {"success": False, "error": {"code": "XREF_ERROR", "message": str(e)}}
# ==================================================================
# Classes & Namespaces Handlers
# ==================================================================
def handle_classes(self, exchange):
if not self.program:
return self._no_program()
params = parse_query_params(exchange)
limit = parse_int(params.get("limit"), 100)
offset = parse_int(params.get("offset"), 0)
class_names = set()
for symbol in self.program.getSymbolTable().getAllSymbols(True):
ns = symbol.getParentNamespace()
if ns and not ns.isGlobal() and ns.getSymbol().getSymbolType().isNamespace():
class_names.add(ns.getName(True))
sorted_names = sorted(class_names)
start = min(offset, len(sorted_names))
end = min(offset + limit, len(sorted_names))
classes = []
for name in sorted_names[start:end]:
info = {"name": name}
if "::" in name:
info["namespace"] = name[:name.rfind("::")]
info["simpleName"] = name[name.rfind("::") + 2:]
else:
info["namespace"] = "global"
info["simpleName"] = name
classes.append(info)
return {
"success": True,
"result": classes,
"size": len(sorted_names),
"offset": offset,
"limit": limit,
}
def handle_namespaces(self, exchange):
if not self.program:
return self._no_program()
params = parse_query_params(exchange)
limit = parse_int(params.get("limit"), 100)
offset = parse_int(params.get("offset"), 0)
namespaces = set()
for symbol in self.program.getSymbolTable().getAllSymbols(True):
ns = symbol.getParentNamespace()
if ns and not ns.isGlobal():
namespaces.add(ns.getName(True))
sorted_ns = sorted(namespaces)
start = min(offset, len(sorted_ns))
end = min(offset + limit, len(sorted_ns))
return {
"success": True,
"result": sorted_ns[start:end],
"size": len(sorted_ns),
"offset": offset,
"limit": limit,
}
# ==================================================================
# Structs Handlers
# ==================================================================
def handle_structs(self, exchange):
if not self.program:
return self._no_program()
params = parse_query_params(exchange)
# Detail lookup by name
name_lookup = params.get("name")
if name_lookup:
return self._handle_struct_detail(name_lookup)
# List structs
limit = parse_int(params.get("limit"), 100)
offset = parse_int(params.get("offset"), 0)
category_filter = params.get("category")
from ghidra.program.model.data import Structure, Union
structs = []
dtm = self.program.getDataTypeManager()
count = 0
skipped = 0
it = dtm.getAllDataTypes()
while it.hasNext() and count < limit:
dt = it.next()
if not isinstance(dt, (Structure, Union)):
continue
if category_filter and category_filter.lower() not in dt.getCategoryPath().getPath().lower():
continue
if skipped < offset:
skipped += 1
continue
structs.append({
"name": dt.getName(),
"category": dt.getCategoryPath().getPath(),
"path": dt.getPathName(),
"size": dt.getLength(),
"type": "struct" if isinstance(dt, Structure) else "union",
"numFields": dt.getNumComponents(),
"_links": {"self": make_link("/structs?name=%s" % dt.getName())},
})
count += 1
return {"success": True, "result": structs, "offset": offset, "limit": limit}
def _handle_struct_detail(self, name):
from ghidra.program.model.data import Composite
dt = self._find_struct(name)
if not dt:
return {"success": False, "error": {"code": "STRUCT_NOT_FOUND", "message": "Struct not found: %s" % name}}
result = {
"name": dt.getName(),
"category": dt.getCategoryPath().getPath(),
"path": dt.getPathName(),
"size": dt.getLength(),
"description": dt.getDescription() or "",
}
if isinstance(dt, Composite):
fields = []
for comp in dt.getComponents():
field = {
"name": comp.getFieldName(),
"type": comp.getDataType().getName(),
"typePath": comp.getDataType().getPathName(),
"offset": comp.getOffset(),
"length": comp.getLength(),
}
if comp.getComment():
field["comment"] = comp.getComment()
fields.append(field)
result["fields"] = fields
result["numFields"] = len(fields)
result["_links"] = {
"self": make_link("/structs?name=%s" % name),
"structs": make_link("/structs"),
}
return {"success": True, "result": result}
def handle_struct_create(self, exchange):
if not self.program:
return self._no_program()
body = parse_json_body(exchange)
name = body.get("name")
if not name:
return {"success": False, "error": {"code": "MISSING_PARAMETER", "message": "Missing 'name'"}}
category = body.get("category", "/")
description = body.get("description", "")
try:
from ghidra.program.model.data import StructureDataType, CategoryPath, DataTypeConflictHandler
cat_path = CategoryPath(category)
new_struct = StructureDataType(cat_path, name, 0)
if description:
new_struct.setDescription(description)
result_holder = [None]
def do_create():
dtm = self.program.getDataTypeManager()
result_holder[0] = dtm.addDataType(new_struct, DataTypeConflictHandler.DEFAULT_HANDLER)
with_transaction(self.program, "Create struct", do_create)
dt = result_holder[0]
return ({"success": True, "result": {
"name": dt.getName() if dt else name,
"path": dt.getPathName() if dt else "%s/%s" % (category, name),
"category": category,
"size": 0,
"message": "Struct created successfully",
}}, 201)
except Exception as e:
return {"success": False, "error": {"code": "STRUCT_ERROR", "message": str(e)}}
def handle_struct_addfield(self, exchange):
if not self.program:
return self._no_program()
body = parse_json_body(exchange)
struct_name = body.get("struct")
field_name = body.get("fieldName")
field_type = body.get("fieldType")
if not struct_name or not field_name or not field_type:
return {"success": False, "error": {"code": "MISSING_PARAMETER", "message": "Missing 'struct', 'fieldName', or 'fieldType'"}}
struct = self._find_struct(struct_name)
if not struct:
return {"success": False, "error": {"code": "STRUCT_NOT_FOUND", "message": "Struct not found: %s" % struct_name}}
dtm = self.program.getDataTypeManager()
dt = resolve_data_type(dtm, field_type)
if not dt:
return {"success": False, "error": {"code": "UNKNOWN_TYPE", "message": "Cannot resolve type: %s" % field_type}}
field_offset = body.get("offset")
comment = body.get("comment")
result_info = {}
def do_add():
if field_offset is not None:
struct.insertAtOffset(int(field_offset), dt, dt.getLength(), field_name, comment)
result_info["offset"] = int(field_offset)
else:
struct.add(dt, dt.getLength(), field_name, comment)
result_info["offset"] = struct.getLength() - dt.getLength()
result_info["length"] = dt.getLength()
result_info["structSize"] = struct.getLength()
try:
with_transaction(self.program, "Add struct field", do_add)
return {"success": True, "result": {
"struct": struct_name,
"fieldName": field_name,
"fieldType": field_type,
"offset": result_info.get("offset", 0),
"length": result_info.get("length", 0),
"structSize": result_info.get("structSize", 0),
"message": "Field added successfully",
}}
except Exception as e:
return {"success": False, "error": {"code": "FIELD_ERROR", "message": str(e)}}
def handle_struct_updatefield(self, exchange):
if not self.program:
return self._no_program()
body = parse_json_body(exchange)
struct_name = body.get("struct")
if not struct_name:
return {"success": False, "error": {"code": "MISSING_PARAMETER", "message": "Missing 'struct'"}}
struct = self._find_struct(struct_name)
if not struct:
return {"success": False, "error": {"code": "STRUCT_NOT_FOUND", "message": "Struct not found: %s" % struct_name}}
field_name = body.get("fieldName")
field_offset = body.get("fieldOffset")
new_name = body.get("newName")
new_type = body.get("newType")
new_comment = body.get("newComment")
if not new_name and not new_type and new_comment is None:
return {"success": False, "error": {"code": "MISSING_PARAMETER", "message": "Provide at least one of: newName, newType, newComment"}}
# Find the component
component = None
if field_offset is not None:
component = struct.getComponentAt(int(field_offset))
elif field_name:
for comp in struct.getComponents():
if comp.getFieldName() == field_name:
component = comp
break
if not component:
return {"success": False, "error": {"code": "FIELD_NOT_FOUND", "message": "Field not found"}}
orig_name = component.getFieldName()
orig_type = component.getDataType().getName()
orig_comment = component.getComment()
def do_update():
if new_name:
component.setFieldName(new_name)
if new_comment is not None:
component.setComment(new_comment)
if new_type:
dtm = self.program.getDataTypeManager()
dt = resolve_data_type(dtm, new_type)
if dt:
component.setDataType(dt)
try:
with_transaction(self.program, "Update struct field", do_update)
return {"success": True, "result": {
"struct": struct_name,
"offset": component.getOffset(),
"originalName": orig_name,
"originalType": orig_type,
"originalComment": orig_comment,
"newName": new_name or orig_name,
"newType": new_type or orig_type,
"newComment": new_comment if new_comment is not None else orig_comment,
"length": component.getLength(),
"message": "Field updated successfully",
}}
except Exception as e:
return {"success": False, "error": {"code": "UPDATE_ERROR", "message": str(e)}}
def handle_struct_delete(self, exchange):
if not self.program:
return self._no_program()
body = parse_json_body(exchange)
name = body.get("name")
if not name:
return {"success": False, "error": {"code": "MISSING_PARAMETER", "message": "Missing 'name'"}}
dt = self._find_struct(name)
if not dt:
return {"success": False, "error": {"code": "STRUCT_NOT_FOUND", "message": "Struct not found: %s" % name}}
category = dt.getCategoryPath().getPath()
path = dt.getPathName()
def do_delete():
dtm = self.program.getDataTypeManager()
dtm.remove(dt, getMonitor())
try:
with_transaction(self.program, "Delete struct", do_delete)
return {"success": True, "result": {
"name": name,
"path": path,
"category": category,
"message": "Struct deleted successfully",
}}
except Exception as e:
return {"success": False, "error": {"code": "DELETE_ERROR", "message": str(e)}}
# ==================================================================
# Analysis Handlers
# ==================================================================
def handle_analysis_info(self, exchange):
if not self.program:
return self._no_program()
result = {
"program": self.program.getName(),
"processor": str(self.program.getLanguage().getProcessor()),
"addressSize": self.program.getAddressFactory().getDefaultAddressSpace().getSize(),
"programLanguage": str(self.program.getLanguage()),
"availableAnalysis": ["callgraph", "xrefs", "decompile", "dataflow"],
"_links": {
"self": make_link("/analysis"),
"callgraph": make_link("/analysis/callgraph"),
"dataflow": make_link("/analysis/dataflow"),
"program": make_link("/program"),
},
}
return {"success": True, "result": result}
def handle_analysis_run(self, exchange):
if not self.program:
return self._no_program()
try:
# In headless mode, try to re-run auto-analysis
from ghidra.app.script import GhidraScriptUtil
analyzeAll(self.program)
return {"success": True, "result": {
"program": self.program.getName(),
"analysis_triggered": True,
"message": "Analysis initiated on program",
}}
except Exception as e:
# analyzeAll may not be available; graceful fallback
return {"success": True, "result": {
"program": self.program.getName(),
"analysis_triggered": False,
"message": "Analysis request received (headless mode: %s)" % str(e),
}}
def handle_callgraph(self, exchange):
if not self.program:
return self._no_program()
params = parse_query_params(exchange)
func_name = params.get("name")
func_addr = params.get("address")
max_depth = parse_int(params.get("max_depth"), 3)
start_func = None
if func_addr:
start_func = self._find_function_at(func_addr)
elif func_name:
start_func = self._find_function_by_name(func_name)
else:
# Use first entry point
entry_iter = self.program.getSymbolTable().getExternalEntryPointIterator()
if entry_iter.hasNext():
start_func = self.program.getFunctionManager().getFunctionAt(entry_iter.next())
if not start_func:
return {"success": False, "error": {"code": "FUNCTION_NOT_FOUND", "message": "Function not found for callgraph"}}
nodes = []
edges = []
visited = set()
self._build_callgraph(start_func, nodes, edges, visited, 0, max_depth)
graph = {
"root": start_func.getName(),
"rootAddress": str(start_func.getEntryPoint()),
"maxDepth": max_depth,
"nodes": nodes,
"edges": edges,
"nodeCount": len(nodes),
"edgeCount": len(edges),
}
return {"success": True, "result": graph}
def _build_callgraph(self, func, nodes, edges, visited, depth, max_depth):
func_id = str(func.getEntryPoint())
if func_id in visited:
return
visited.add(func_id)
nodes.append({
"id": func_id,
"name": func.getName(),
"address": func_id,
"depth": depth,
})
if depth >= max_depth:
return
ref_mgr = self.program.getReferenceManager()
body = func.getBody()
addr_iter = body.getAddresses(True)
while addr_iter.hasNext():
addr = addr_iter.next()
refs = ref_mgr.getReferencesFrom(addr)
for ref in refs:
if ref.getReferenceType().isCall():
called = self.program.getFunctionManager().getFunctionAt(ref.getToAddress())
if called:
edges.append({
"from": func_id,
"to": str(called.getEntryPoint()),
"type": "call",
"callSite": str(addr),
})
self._build_callgraph(called, nodes, edges, visited, depth + 1, max_depth)
def handle_dataflow(self, exchange):
if not self.program:
return self._no_program()
params = parse_query_params(exchange)
addr_str = params.get("address")
if not addr_str:
return {"success": False, "error": {"code": "MISSING_PARAMETER", "message": "Missing 'address' parameter"}}
direction = params.get("direction", "forward")
max_steps = parse_int(params.get("max_steps"), 50)
try:
addr = self.program.getAddressFactory().getAddress(addr_str)
func = self.program.getFunctionManager().getFunctionContaining(addr)
if not func:
return {"success": False, "error": {"code": "FUNCTION_NOT_FOUND", "message": "No function containing address: %s" % addr_str}}
# Simplified dataflow: walk instructions from the starting address
listing = self.program.getListing()
steps = []
current = addr
step_count = 0
if direction == "forward":
inst_iter = listing.getInstructions(addr, True)
else:
inst_iter = listing.getInstructions(addr, False)
while inst_iter.hasNext() and step_count < max_steps:
instr = inst_iter.next()
# Stay within function body
if not func.getBody().contains(instr.getAddress()):
break
full_repr = str(instr)
steps.append({
"address": str(instr.getAddress()),
"instruction": full_repr,
"mnemonic": instr.getMnemonicString(),
"bytes": bytes_to_hex(instr.getBytes()),
})
step_count += 1
return {
"success": True,
"result": {
"start_address": addr_str,
"function": func.getName(),
"direction": direction,
"max_steps": max_steps,
"total_steps": len(steps),
"steps": steps,
"sources": [],
"sinks": [],
}
}
except Exception as e:
return {"success": False, "error": {"code": "DATAFLOW_ERROR", "message": str(e)}}
# ==================================================================
# Legacy Compatibility
# ==================================================================
def handle_decompile_legacy(self, exchange):
"""Handle GET /decompile?name=X or ?address=X (backwards compat)."""
if not self.program:
return self._no_program()
params = parse_query_params(exchange)
name = params.get("name") or params.get("address")
if not name:
return {"success": False, "error": {"code": "MISSING_PARAMETER", "message": "Missing 'name' or 'address' parameter"}}
# Try by name first, then by address
func = self._find_function_by_name(name)
if not func:
func = self._find_function_at(name)
if not func:
return {"success": False, "error": {"code": "FUNCTION_NOT_FOUND", "message": "Function not found: %s" % name}}
result = self.decompiler.decompileFunction(func, 30, getMonitor())
if result and result.decompileCompleted():
code = result.getDecompiledFunction().getC()
return {
"success": True,
"name": func.getName(),
"address": str(func.getEntryPoint()),
"decompiled": code,
}
return {"success": False, "error": {"code": "DECOMPILE_FAILED", "message": "Decompilation failed"}}
# ========================================================================
# Server Startup
# ========================================================================
def run_server(port, program, decompiler):
"""Start the HTTP server with a single catch-all handler."""
server = HttpServer.create(InetSocketAddress(port), 0)
server.createContext("/", GhydraMCPHandler(program, decompiler))
server.setExecutor(Executors.newCachedThreadPool())
server.start()
println("[GhydraMCP] HTTP server started on port %d" % port)
return server
# ========================================================================
# Main Entry Point
# ========================================================================
def main():
port = DEFAULT_PORT
# Parse port from script arguments
args = getScriptArgs()
if args and len(args) > 0:
try:
port = int(args[0])
except:
println("Invalid port number, using default: %d" % DEFAULT_PORT)
# Initialize decompiler
decompiler = DecompInterface()
decompiler.openProgram(currentProgram)
println("=========================================")
println(" GhydraMCP Headless HTTP Server")
println("=========================================")
println(" API Version: %s (compat: %d)" % (API_VERSION_STRING, API_VERSION))
println(" Port: %d" % port)
println(" Program: %s" % (currentProgram.getName() if currentProgram else "None"))
println(" Script: Python/Jython (Full API)")
println(" Routes: %d" % len(ROUTES))
println("=========================================")
server = run_server(port, currentProgram, decompiler)
println("")
println("GhydraMCP Server running. Press Ctrl+C to stop.")
println("API available at: http://localhost:%d/" % port)
# Keep the script running
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
server.stop(0)
println("[GhydraMCP] Server stopped.")
# Run
main()