Sprint 3 (Symbol & Variable CRUD): - Add symbols_create, symbols_rename, symbols_delete MCP tools - Add variables_rename MCP tool with optional type change - Implement corresponding Jython HTTP handlers in headless server Sprint 4 (Bookmarks & Data Types): - Add BookmarksMixin (bookmarks_list, bookmarks_create, bookmarks_delete) - Add DataTypesMixin (enums_list, enums_create, typedefs_list, typedefs_create) - Register both mixins in server.py, add resource caps in config.py Fixes: - Use resolve_data_type() for typedef creation and variable retyping (was missing builtin types like int, char, void) - Fix docker_auto_start reusing containers with wrong binary loaded (now compares requested binary name against running instance) Headless server (GhydraMCPServer.py): +14 routes, 58 total MCP tools: 75 registered Tested: 24/24 endpoint tests passing
2804 lines
106 KiB
Python
2804 lines
106 KiB
Python
# GhydraMCPServer.py - Headless Ghidra script for GhydraMCP HTTP API
|
|
# Full API parity with the Java plugin implementation.
|
|
# Python 2 / Jython compatible (no f-strings, no readAllBytes).
|
|
#
|
|
# Usage: analyzeHeadless <project> <name> -import <binary> -postScript GhydraMCPServer.py [port]
|
|
#
|
|
#@category GhydraMCP
|
|
#@keybinding
|
|
#@menupath
|
|
#@toolbar
|
|
|
|
# === Java imports ===
|
|
from com.sun.net.httpserver import HttpServer, HttpHandler
|
|
from java.net import InetSocketAddress, URLDecoder
|
|
from java.util.concurrent import Executors
|
|
from java.io import OutputStream, BufferedReader, InputStreamReader
|
|
|
|
# === Ghidra imports ===
|
|
from ghidra.app.decompiler import DecompInterface
|
|
from ghidra.program.model.symbol import SourceType
|
|
from ghidra.program.model.listing import CodeUnit
|
|
|
|
# === Python imports ===
|
|
import json
|
|
import re
|
|
import threading
|
|
import time
|
|
|
|
# === Constants ===
|
|
API_VERSION = 2 # Integer for MCP client compatibility (minimum expected: 2)
|
|
API_VERSION_STRING = "2.1"
|
|
DEFAULT_PORT = 8192
|
|
|
|
# === Thread-safe transaction lock ===
|
|
_tx_lock = threading.Lock()
|
|
|
|
|
|
# ========================================================================
|
|
# Utility Functions
|
|
# ========================================================================
|
|
|
|
def url_decode(s):
|
|
"""URL-decode a string using Java's URLDecoder."""
|
|
try:
|
|
return URLDecoder.decode(s, "UTF-8")
|
|
except:
|
|
return s
|
|
|
|
|
|
def parse_query_params(exchange):
|
|
"""Parse query string from an HttpExchange into a dict."""
|
|
params = {}
|
|
query = exchange.getRequestURI().getQuery()
|
|
if query:
|
|
for part in query.split("&"):
|
|
if "=" in part:
|
|
k, v = part.split("=", 1)
|
|
params[k] = url_decode(v)
|
|
else:
|
|
params[part] = ""
|
|
return params
|
|
|
|
|
|
def read_request_body(exchange):
|
|
"""Read the request body as a string (Jython-compatible, no readAllBytes)."""
|
|
reader = BufferedReader(InputStreamReader(exchange.getRequestBody(), "UTF-8"))
|
|
lines = []
|
|
line = reader.readLine()
|
|
while line is not None:
|
|
lines.append(line)
|
|
line = reader.readLine()
|
|
reader.close()
|
|
return "\n".join(lines)
|
|
|
|
|
|
def parse_json_body(exchange):
|
|
"""Read and parse JSON body from an HTTP request."""
|
|
body = read_request_body(exchange)
|
|
if not body or not body.strip():
|
|
return {}
|
|
try:
|
|
return json.loads(body)
|
|
except:
|
|
return {}
|
|
|
|
|
|
def parse_int(value, default):
|
|
"""Safely parse an integer with a default fallback."""
|
|
if value is None:
|
|
return default
|
|
try:
|
|
return int(value)
|
|
except (ValueError, TypeError):
|
|
return default
|
|
|
|
|
|
def bytes_to_hex(byte_array):
|
|
"""Convert a Java/Python byte array to a hex string."""
|
|
return ''.join(['%02x' % (b & 0xff) for b in byte_array])
|
|
|
|
|
|
def hex_to_bytes(hex_str):
|
|
"""Convert a hex string to a list of integer byte values."""
|
|
result = []
|
|
for i in range(0, len(hex_str), 2):
|
|
result.append(int(hex_str[i:i+2], 16))
|
|
return result
|
|
|
|
|
|
def make_link(href):
|
|
"""Create a HATEOAS link dict."""
|
|
return {"href": href}
|
|
|
|
|
|
def compile_grep(params):
|
|
"""Compile a grep pattern from query params if present.
|
|
|
|
Returns a compiled regex or None if no grep param.
|
|
Uses re.IGNORECASE by default.
|
|
"""
|
|
grep = params.get("grep")
|
|
if not grep:
|
|
return None
|
|
try:
|
|
return re.compile(grep, re.IGNORECASE)
|
|
except:
|
|
return None
|
|
|
|
|
|
def grep_matches_item(item, pattern):
|
|
"""Check if any string value in item matches the grep pattern.
|
|
|
|
Searches all string values in dict items, or the string
|
|
representation of non-dict items.
|
|
"""
|
|
if pattern is None:
|
|
return True
|
|
if isinstance(item, dict):
|
|
for value in item.values():
|
|
if isinstance(value, (str,)):
|
|
if pattern.search(value):
|
|
return True
|
|
elif isinstance(value, (int, float, bool)):
|
|
if pattern.search(str(value)):
|
|
return True
|
|
return False
|
|
return bool(pattern.search(str(item)))
|
|
|
|
|
|
def with_transaction(program, desc, fn):
|
|
"""Execute fn inside a thread-safe Ghidra transaction."""
|
|
_tx_lock.acquire()
|
|
try:
|
|
tx_id = program.startTransaction(desc)
|
|
try:
|
|
result = fn()
|
|
program.endTransaction(tx_id, True)
|
|
return result
|
|
except:
|
|
program.endTransaction(tx_id, False)
|
|
raise
|
|
finally:
|
|
_tx_lock.release()
|
|
|
|
|
|
# ========================================================================
|
|
# Data Type Resolution
|
|
# ========================================================================
|
|
|
|
_BUILTIN_TYPES = None
|
|
|
|
def _init_builtin_types():
|
|
"""Lazily initialize the builtin data type map."""
|
|
global _BUILTIN_TYPES
|
|
if _BUILTIN_TYPES is not None:
|
|
return _BUILTIN_TYPES
|
|
_BUILTIN_TYPES = {}
|
|
try:
|
|
from ghidra.program.model.data import (
|
|
ByteDataType, WordDataType, DWordDataType, QWordDataType,
|
|
FloatDataType, DoubleDataType, CharDataType,
|
|
ShortDataType, IntegerDataType, LongDataType, LongLongDataType,
|
|
PointerDataType, BooleanDataType,
|
|
StringDataType, UnicodeDataType,
|
|
Undefined1DataType, Undefined2DataType,
|
|
Undefined4DataType, Undefined8DataType
|
|
)
|
|
pairs = [
|
|
("byte", ByteDataType), ("word", WordDataType),
|
|
("dword", DWordDataType), ("qword", QWordDataType),
|
|
("float", FloatDataType), ("double", DoubleDataType),
|
|
("char", CharDataType), ("short", ShortDataType),
|
|
("int", IntegerDataType), ("long", LongDataType),
|
|
("longlong", LongLongDataType),
|
|
("pointer", PointerDataType), ("bool", BooleanDataType),
|
|
("boolean", BooleanDataType),
|
|
("string", StringDataType), ("unicode", UnicodeDataType),
|
|
("undefined1", Undefined1DataType), ("undefined2", Undefined2DataType),
|
|
("undefined4", Undefined4DataType), ("undefined8", Undefined8DataType),
|
|
]
|
|
for name, cls in pairs:
|
|
try:
|
|
_BUILTIN_TYPES[name] = cls.dataType
|
|
except:
|
|
pass
|
|
except ImportError:
|
|
pass
|
|
# Also try unsigned variants
|
|
try:
|
|
from ghidra.program.model.data import (
|
|
UnsignedShortDataType, UnsignedIntegerDataType,
|
|
UnsignedLongDataType, UnsignedLongLongDataType
|
|
)
|
|
for name, cls in [
|
|
("ushort", UnsignedShortDataType), ("uint", UnsignedIntegerDataType),
|
|
("ulong", UnsignedLongDataType), ("ulonglong", UnsignedLongLongDataType),
|
|
]:
|
|
try:
|
|
_BUILTIN_TYPES[name] = cls.dataType
|
|
except:
|
|
pass
|
|
except ImportError:
|
|
pass
|
|
return _BUILTIN_TYPES
|
|
|
|
|
|
def resolve_data_type(dtm, type_name):
|
|
"""Resolve a type name string to a Ghidra DataType object."""
|
|
if not type_name:
|
|
return None
|
|
# Direct path lookup
|
|
dt = dtm.getDataType("/" + type_name)
|
|
if dt is not None:
|
|
return dt
|
|
# Builtin types map
|
|
builtins = _init_builtin_types()
|
|
dt = builtins.get(type_name.lower())
|
|
if dt is not None:
|
|
return dt
|
|
# Search all data types (expensive fallback)
|
|
it = dtm.getAllDataTypes()
|
|
while it.hasNext():
|
|
candidate = it.next()
|
|
if candidate.getName().lower() == type_name.lower():
|
|
return candidate
|
|
return None
|
|
|
|
|
|
# ========================================================================
|
|
# Comment type mapping
|
|
# ========================================================================
|
|
|
|
COMMENT_TYPE_MAP = {
|
|
"pre": CodeUnit.PRE_COMMENT,
|
|
"post": CodeUnit.POST_COMMENT,
|
|
"eol": CodeUnit.EOL_COMMENT,
|
|
"plate": CodeUnit.PLATE_COMMENT,
|
|
"repeatable": CodeUnit.REPEATABLE_COMMENT,
|
|
}
|
|
|
|
|
|
# ========================================================================
|
|
# Route Table
|
|
# ========================================================================
|
|
|
|
ROUTES = [
|
|
# (HTTP method, path regex, handler method name)
|
|
|
|
# Root / Meta
|
|
("GET", r"^/$", "handle_root"),
|
|
("GET", r"^/info$", "handle_info"),
|
|
("GET", r"^/program$", "handle_program"),
|
|
|
|
# Headless stubs (cursor-dependent, not available headless)
|
|
("GET", r"^/address$", "handle_address_stub"),
|
|
("GET", r"^/function$", "handle_function_stub"),
|
|
|
|
# Functions - by-name variants (must precede address patterns)
|
|
("GET", r"^/functions/by-name/([^/]+)/decompile$", "handle_decompile_by_name"),
|
|
("GET", r"^/functions/by-name/([^/]+)/disassembly$", "handle_disassembly_by_name"),
|
|
("PUT", r"^/functions/by-name/([^/]+)/signature$", "handle_signature_by_name"),
|
|
("PATCH", r"^/functions/by-name/([^/]+)$", "handle_patch_function_by_name"),
|
|
("GET", r"^/functions/by-name/([^/]+)$", "handle_function_by_name"),
|
|
|
|
# Functions - by address
|
|
("GET", r"^/functions/([^/]+)/decompile$", "handle_decompile"),
|
|
("GET", r"^/functions/([^/]+)/disassembly$", "handle_disassembly"),
|
|
("GET", r"^/functions/([^/]+)/variables$", "handle_function_variables"),
|
|
("PUT", r"^/functions/([^/]+)/signature$", "handle_signature"),
|
|
("PATCH", r"^/functions/([^/]+)$", "handle_patch_function"),
|
|
("GET", r"^/functions/([^/]+)$", "handle_function_detail"),
|
|
|
|
# Functions list / create
|
|
("GET", r"^/functions/?$", "handle_functions_list"),
|
|
("POST", r"^/functions/?$", "handle_create_function"),
|
|
|
|
# Data
|
|
("POST", r"^/data/delete$", "handle_data_delete"),
|
|
("POST", r"^/data/type$", "handle_data_type"),
|
|
("GET", r"^/data/strings$", "handle_strings"),
|
|
("GET", r"^/data/?$", "handle_data_list"),
|
|
("POST", r"^/data/?$", "handle_data_create"),
|
|
|
|
# Strings
|
|
("GET", r"^/strings$", "handle_strings"),
|
|
|
|
# Memory
|
|
("GET", r"^/memory/([^/]+)/comments/([^/]+)$", "handle_get_comment"),
|
|
("POST", r"^/memory/([^/]+)/comments/([^/]+)$", "handle_set_comment"),
|
|
("GET", r"^/memory/blocks$", "handle_memory_blocks"),
|
|
("GET", r"^/memory$", "handle_memory_read"),
|
|
("PATCH", r"^/programs/current/memory/([^/]+)$", "handle_memory_write"),
|
|
|
|
# Segments
|
|
("GET", r"^/segments$", "handle_segments"),
|
|
|
|
# Symbols
|
|
("GET", r"^/symbols/imports$", "handle_imports"),
|
|
("GET", r"^/symbols/exports$", "handle_exports"),
|
|
("POST", r"^/symbols$", "handle_symbol_create"),
|
|
("PATCH", r"^/symbols/([^/]+)$", "handle_symbol_rename"),
|
|
("DELETE", r"^/symbols/([^/]+)$", "handle_symbol_delete"),
|
|
("GET", r"^/symbols$", "handle_symbols"),
|
|
|
|
# Variables
|
|
("PATCH", r"^/functions/([^/]+)/variables/([^/]+)$", "handle_variable_rename"),
|
|
("GET", r"^/variables$", "handle_variables"),
|
|
|
|
# Bookmarks
|
|
("POST", r"^/bookmarks$", "handle_bookmark_create"),
|
|
("DELETE", r"^/bookmarks/([^/]+)$", "handle_bookmark_delete"),
|
|
("GET", r"^/bookmarks$", "handle_bookmarks"),
|
|
|
|
# Data types
|
|
("POST", r"^/datatypes/enums$", "handle_enum_create"),
|
|
("GET", r"^/datatypes/enums$", "handle_enums"),
|
|
("POST", r"^/datatypes/typedefs$", "handle_typedef_create"),
|
|
("GET", r"^/datatypes/typedefs$", "handle_typedefs"),
|
|
|
|
# Cross-references
|
|
("GET", r"^/xrefs$", "handle_xrefs"),
|
|
|
|
# Classes & Namespaces
|
|
("GET", r"^/classes$", "handle_classes"),
|
|
("GET", r"^/namespaces$", "handle_namespaces"),
|
|
|
|
# Structs
|
|
("POST", r"^/structs/create$", "handle_struct_create"),
|
|
("POST", r"^/structs/addfield$", "handle_struct_addfield"),
|
|
("POST", r"^/structs/updatefield$", "handle_struct_updatefield"),
|
|
("POST", r"^/structs/delete$", "handle_struct_delete"),
|
|
("GET", r"^/structs$", "handle_structs"),
|
|
|
|
# Analysis
|
|
("GET", r"^/analysis/callgraph$", "handle_callgraph"),
|
|
("GET", r"^/analysis/dataflow$", "handle_dataflow"),
|
|
("GET", r"^/analysis$", "handle_analysis_info"),
|
|
("POST", r"^/analysis$", "handle_analysis_run"),
|
|
|
|
# Legacy compatibility
|
|
("GET", r"^/decompile$", "handle_decompile_legacy"),
|
|
]
|
|
|
|
|
|
# ========================================================================
|
|
# HTTP Handler
|
|
# ========================================================================
|
|
|
|
class GhydraMCPHandler(HttpHandler):
|
|
|
|
def __init__(self, program, decompiler):
|
|
self.program = program
|
|
self.decompiler = decompiler
|
|
# Pre-compile route patterns
|
|
self.routes = []
|
|
for method, pattern, handler_name in ROUTES:
|
|
self.routes.append((method, re.compile(pattern), handler_name))
|
|
|
|
# ------------------------------------------------------------------
|
|
# Dispatch
|
|
# ------------------------------------------------------------------
|
|
|
|
def handle(self, exchange):
|
|
try:
|
|
method = exchange.getRequestMethod()
|
|
path = exchange.getRequestURI().getPath()
|
|
|
|
# CORS preflight
|
|
if method == "OPTIONS":
|
|
self._send_cors_preflight(exchange)
|
|
return
|
|
|
|
# Match routes
|
|
for route_method, pattern, handler_name in self.routes:
|
|
if method != route_method:
|
|
continue
|
|
match = pattern.match(path)
|
|
if match:
|
|
handler = getattr(self, handler_name)
|
|
groups = match.groups()
|
|
decoded = tuple(url_decode(g) for g in groups)
|
|
result = handler(exchange, *decoded)
|
|
if isinstance(result, tuple):
|
|
response, code = result
|
|
self._send_response(exchange, code, response)
|
|
else:
|
|
self._send_response(exchange, 200, result)
|
|
return
|
|
|
|
# No route matched
|
|
self._send_response(exchange, 404, {
|
|
"success": False,
|
|
"error": {"code": "NOT_FOUND", "message": "Endpoint not found: %s %s" % (method, path)}
|
|
})
|
|
except Exception as e:
|
|
try:
|
|
self._send_response(exchange, 500, {
|
|
"success": False,
|
|
"error": {"code": "INTERNAL_ERROR", "message": str(e)}
|
|
})
|
|
except:
|
|
pass
|
|
|
|
def _send_response(self, exchange, code, data):
|
|
response_bytes = json.dumps(data, indent=2).encode('utf-8')
|
|
headers = exchange.getResponseHeaders()
|
|
headers.set("Content-Type", "application/json; charset=utf-8")
|
|
headers.set("Access-Control-Allow-Origin", "*")
|
|
headers.set("Access-Control-Allow-Methods", "GET, POST, PATCH, PUT, DELETE, OPTIONS")
|
|
headers.set("Access-Control-Allow-Headers", "Content-Type, X-Request-ID")
|
|
exchange.sendResponseHeaders(code, len(response_bytes))
|
|
os = exchange.getResponseBody()
|
|
os.write(response_bytes)
|
|
os.close()
|
|
|
|
def _send_cors_preflight(self, exchange):
|
|
headers = exchange.getResponseHeaders()
|
|
headers.set("Access-Control-Allow-Origin", "*")
|
|
headers.set("Access-Control-Allow-Methods", "GET, POST, PATCH, PUT, DELETE, OPTIONS")
|
|
headers.set("Access-Control-Allow-Headers", "Content-Type, X-Request-ID")
|
|
headers.set("Access-Control-Max-Age", "86400")
|
|
exchange.sendResponseHeaders(204, -1)
|
|
exchange.getResponseBody().close()
|
|
|
|
# ------------------------------------------------------------------
|
|
# Internal helpers
|
|
# ------------------------------------------------------------------
|
|
|
|
def _no_program(self):
|
|
return {"success": False, "error": {"code": "NO_PROGRAM_LOADED", "message": "No program loaded"}}
|
|
|
|
def _find_function_at(self, addr_str):
|
|
"""Find a function by address string. Falls back to containing function."""
|
|
try:
|
|
addr = self.program.getAddressFactory().getAddress(addr_str)
|
|
fm = self.program.getFunctionManager()
|
|
func = fm.getFunctionAt(addr)
|
|
if func is None:
|
|
func = fm.getFunctionContaining(addr)
|
|
return func
|
|
except:
|
|
return None
|
|
|
|
def _find_function_by_name(self, name):
|
|
"""Find a function by exact name match."""
|
|
fm = self.program.getFunctionManager()
|
|
for func in fm.getFunctions(True):
|
|
if func.getName() == name:
|
|
return func
|
|
return None
|
|
|
|
def _get_function(self, identifier, by_name=False):
|
|
"""Resolve a function by name or address."""
|
|
if by_name:
|
|
return self._find_function_by_name(identifier)
|
|
return self._find_function_at(identifier)
|
|
|
|
def _decompile_function(self, func):
|
|
"""Decompile a function and return the result dict."""
|
|
result = self.decompiler.decompileFunction(func, 60, getMonitor())
|
|
resp = {
|
|
"name": func.getName(),
|
|
"address": str(func.getEntryPoint()),
|
|
}
|
|
if result and result.decompileCompleted():
|
|
decomp = result.getDecompiledFunction()
|
|
code = decomp.getC()
|
|
resp["decompiled"] = code
|
|
resp["decompiled_text"] = code # alias for MCP client compat
|
|
resp["ccode"] = code # alias for API spec compat
|
|
resp["signature"] = decomp.getSignature()
|
|
else:
|
|
msg = result.getErrorMessage() if result else "Unknown error"
|
|
resp["error"] = "Decompilation failed: %s" % msg
|
|
addr = str(func.getEntryPoint())
|
|
resp["_links"] = {
|
|
"self": make_link("/functions/%s/decompile" % addr),
|
|
"function": make_link("/functions/%s" % addr),
|
|
"disassembly": make_link("/functions/%s/disassembly" % addr),
|
|
}
|
|
return {"success": True, "result": resp}
|
|
|
|
def _build_disassembly(self, func):
|
|
"""Get disassembly instructions for a function."""
|
|
instructions = []
|
|
listing = self.program.getListing()
|
|
body = func.getBody()
|
|
inst_iter = listing.getInstructions(body, True)
|
|
while inst_iter.hasNext():
|
|
instr = inst_iter.next()
|
|
full_repr = str(instr)
|
|
mnemonic = instr.getMnemonicString()
|
|
operands = full_repr[len(mnemonic):].strip() if len(full_repr) > len(mnemonic) else ""
|
|
instructions.append({
|
|
"address": str(instr.getAddress()),
|
|
"mnemonic": mnemonic,
|
|
"operands": operands,
|
|
"bytes": bytes_to_hex(instr.getBytes()),
|
|
})
|
|
addr = str(func.getEntryPoint())
|
|
# Build text representation for MCP client
|
|
lines = []
|
|
for inst in instructions:
|
|
lines.append("%s %s %s" % (inst["address"], inst["mnemonic"], inst["operands"]))
|
|
disassembly_text = "\n".join(lines)
|
|
return {
|
|
"success": True,
|
|
"result": {
|
|
"name": func.getName(),
|
|
"address": addr,
|
|
"instructions": instructions,
|
|
"instructionCount": len(instructions),
|
|
"disassembly_text": disassembly_text,
|
|
"_links": {
|
|
"self": make_link("/functions/%s/disassembly" % addr),
|
|
"function": make_link("/functions/%s" % addr),
|
|
"decompile": make_link("/functions/%s/decompile" % addr),
|
|
},
|
|
}
|
|
}
|
|
|
|
def _build_function_info(self, func):
|
|
"""Build a detailed function info dict."""
|
|
addr = str(func.getEntryPoint())
|
|
info = {
|
|
"name": func.getName(),
|
|
"address": addr,
|
|
"signature": str(func.getSignature()),
|
|
"returnType": func.getReturnType().getName(),
|
|
"parameterCount": func.getParameterCount(),
|
|
"callingConvention": func.getCallingConventionName(),
|
|
"isThunk": func.isThunk(),
|
|
"isExternal": func.isExternal(),
|
|
"bodySize": func.getBody().getNumAddresses(),
|
|
}
|
|
# Parameters
|
|
params = []
|
|
for param in func.getParameters():
|
|
params.append({
|
|
"name": param.getName(),
|
|
"type": param.getDataType().getName(),
|
|
"ordinal": param.getOrdinal(),
|
|
})
|
|
info["parameters"] = params
|
|
# Comments
|
|
entry = func.getEntryPoint()
|
|
listing = self.program.getListing()
|
|
cu = listing.getCodeUnitAt(entry)
|
|
if cu:
|
|
for ctype, cname in [
|
|
(CodeUnit.PRE_COMMENT, "preComment"),
|
|
(CodeUnit.POST_COMMENT, "postComment"),
|
|
(CodeUnit.EOL_COMMENT, "eolComment"),
|
|
(CodeUnit.PLATE_COMMENT, "plateComment"),
|
|
]:
|
|
c = cu.getComment(ctype)
|
|
if c:
|
|
info[cname] = c
|
|
func_comment = func.getComment()
|
|
if func_comment:
|
|
info["comment"] = func_comment
|
|
# HATEOAS links
|
|
info["_links"] = {
|
|
"self": make_link("/functions/%s" % addr),
|
|
"decompile": make_link("/functions/%s/decompile" % addr),
|
|
"disassembly": make_link("/functions/%s/disassembly" % addr),
|
|
"variables": make_link("/functions/%s/variables" % addr),
|
|
"xrefs_to": make_link("/xrefs?to_addr=%s" % addr),
|
|
"xrefs_from": make_link("/xrefs?from_addr=%s" % addr),
|
|
}
|
|
return info
|
|
|
|
def _build_xref_info(self, ref):
|
|
"""Build a cross-reference info dict."""
|
|
info = {
|
|
"fromAddress": str(ref.getFromAddress()),
|
|
"toAddress": str(ref.getToAddress()),
|
|
"type": self._get_ref_type_name(ref.getReferenceType()),
|
|
"isPrimary": ref.isPrimary(),
|
|
}
|
|
fm = self.program.getFunctionManager()
|
|
from_func = fm.getFunctionContaining(ref.getFromAddress())
|
|
to_func = fm.getFunctionContaining(ref.getToAddress())
|
|
if from_func:
|
|
info["fromFunction"] = from_func.getName()
|
|
if to_func:
|
|
info["toFunction"] = to_func.getName()
|
|
return info
|
|
|
|
def _get_ref_type_name(self, ref_type):
|
|
if ref_type.isCall():
|
|
return "CALL"
|
|
if ref_type.isData():
|
|
return "DATA"
|
|
if ref_type.isRead():
|
|
return "READ"
|
|
if ref_type.isWrite():
|
|
return "WRITE"
|
|
if ref_type.isJump():
|
|
return "JUMP"
|
|
return str(ref_type)
|
|
|
|
def _find_struct(self, name):
|
|
"""Find a struct/union data type by name."""
|
|
dtm = self.program.getDataTypeManager()
|
|
it = dtm.getAllDataTypes()
|
|
while it.hasNext():
|
|
dt = it.next()
|
|
if dt.getName() == name:
|
|
from ghidra.program.model.data import Structure, Union
|
|
if isinstance(dt, (Structure, Union)):
|
|
return dt
|
|
return None
|
|
|
|
# ==================================================================
|
|
# Root / Meta Handlers
|
|
# ==================================================================
|
|
|
|
def handle_root(self, exchange):
|
|
result = {
|
|
"success": True,
|
|
"api_version": API_VERSION,
|
|
"api_version_string": API_VERSION_STRING,
|
|
"message": "GhydraMCP Headless API",
|
|
"mode": "headless",
|
|
}
|
|
if self.program:
|
|
result["program"] = self.program.getName()
|
|
result["file"] = self.program.getExecutablePath()
|
|
result["language"] = str(self.program.getLanguageID())
|
|
result["_links"] = {
|
|
"self": make_link("/"),
|
|
"info": make_link("/info"),
|
|
"program": make_link("/program"),
|
|
"functions": make_link("/functions"),
|
|
"data": make_link("/data"),
|
|
"strings": make_link("/strings"),
|
|
"memory": make_link("/memory"),
|
|
"segments": make_link("/segments"),
|
|
"symbols": make_link("/symbols"),
|
|
"xrefs": make_link("/xrefs"),
|
|
"classes": make_link("/classes"),
|
|
"namespaces": make_link("/namespaces"),
|
|
"structs": make_link("/structs"),
|
|
"analysis": make_link("/analysis"),
|
|
}
|
|
return result
|
|
|
|
def handle_info(self, exchange):
|
|
if not self.program:
|
|
return self._no_program()
|
|
mem = self.program.getMemory()
|
|
result = {
|
|
"success": True,
|
|
"result": {
|
|
"name": self.program.getName(),
|
|
"path": self.program.getExecutablePath(),
|
|
"language": str(self.program.getLanguageID()),
|
|
"processor": str(self.program.getLanguage().getProcessor()),
|
|
"addressSize": self.program.getAddressFactory().getDefaultAddressSpace().getSize(),
|
|
"imageBase": str(self.program.getImageBase()),
|
|
"minAddress": str(mem.getMinAddress()),
|
|
"maxAddress": str(mem.getMaxAddress()),
|
|
"memorySize": mem.getSize(),
|
|
"functionCount": self.program.getFunctionManager().getFunctionCount(),
|
|
"mode": "headless",
|
|
"_links": {
|
|
"self": make_link("/info"),
|
|
"program": make_link("/program"),
|
|
"functions": make_link("/functions"),
|
|
},
|
|
}
|
|
}
|
|
return result
|
|
|
|
def handle_program(self, exchange):
|
|
if not self.program:
|
|
return self._no_program()
|
|
mem = self.program.getMemory()
|
|
fm = self.program.getFunctionManager()
|
|
result = {
|
|
"name": self.program.getName(),
|
|
"path": self.program.getExecutablePath(),
|
|
"language": str(self.program.getLanguageID()),
|
|
"compiler": str(self.program.getCompilerSpec().getCompilerSpecID()),
|
|
"processor": str(self.program.getLanguage().getProcessor()),
|
|
"addressSize": self.program.getAddressFactory().getDefaultAddressSpace().getSize(),
|
|
"imageBase": str(self.program.getImageBase()),
|
|
"minAddress": str(mem.getMinAddress()),
|
|
"maxAddress": str(mem.getMaxAddress()),
|
|
"memorySize": mem.getSize(),
|
|
"functionCount": fm.getFunctionCount(),
|
|
"_links": {
|
|
"self": make_link("/program"),
|
|
"functions": make_link("/functions"),
|
|
"symbols": make_link("/symbols"),
|
|
"segments": make_link("/segments"),
|
|
"analysis": make_link("/analysis"),
|
|
},
|
|
}
|
|
return {"success": True, "result": result}
|
|
|
|
# ==================================================================
|
|
# Headless Stubs
|
|
# ==================================================================
|
|
|
|
def handle_address_stub(self, exchange):
|
|
return {
|
|
"success": False,
|
|
"error": {
|
|
"code": "HEADLESS_MODE",
|
|
"message": "Current address is not available in headless mode. Use /functions or /data with address parameters instead."
|
|
}
|
|
}
|
|
|
|
def handle_function_stub(self, exchange):
|
|
return {
|
|
"success": False,
|
|
"error": {
|
|
"code": "HEADLESS_MODE",
|
|
"message": "Current function is not available in headless mode. Use /functions/{address} instead."
|
|
}
|
|
}
|
|
|
|
# ==================================================================
|
|
# Function Handlers
|
|
# ==================================================================
|
|
|
|
def handle_functions_list(self, exchange):
|
|
if not self.program:
|
|
return self._no_program()
|
|
params = parse_query_params(exchange)
|
|
limit = parse_int(params.get("limit"), 100)
|
|
offset = parse_int(params.get("offset"), 0)
|
|
name_filter = params.get("name")
|
|
name_contains = params.get("name_contains")
|
|
name_regex = params.get("name_matches_regex")
|
|
addr_filter = params.get("addr")
|
|
grep_pattern = compile_grep(params)
|
|
|
|
# Compile name regex if provided
|
|
name_regex_pat = None
|
|
if name_regex:
|
|
try:
|
|
name_regex_pat = re.compile(name_regex, re.IGNORECASE)
|
|
except:
|
|
return {"success": False, "error": {"code": "INVALID_REGEX", "message": "Invalid regex: %s" % name_regex}}
|
|
|
|
fm = self.program.getFunctionManager()
|
|
total = fm.getFunctionCount()
|
|
|
|
# Single function lookup by address
|
|
if addr_filter:
|
|
func = self._find_function_at(addr_filter)
|
|
if not func:
|
|
return {"success": True, "result": [], "size": 0, "offset": 0, "limit": limit}
|
|
addr = str(func.getEntryPoint())
|
|
return {"success": True, "result": [{
|
|
"name": func.getName(),
|
|
"address": addr,
|
|
"signature": str(func.getSignature()),
|
|
"parameterCount": func.getParameterCount(),
|
|
"isThunk": func.isThunk(),
|
|
"_links": {
|
|
"self": make_link("/functions/%s" % addr),
|
|
"decompile": make_link("/functions/%s/decompile" % addr),
|
|
"disassembly": make_link("/functions/%s/disassembly" % addr),
|
|
},
|
|
}], "size": 1, "offset": 0, "limit": limit}
|
|
|
|
functions = []
|
|
count = 0
|
|
skipped = 0
|
|
|
|
for func in fm.getFunctions(True):
|
|
if count >= limit:
|
|
break
|
|
func_name = func.getName()
|
|
# Apply name filters
|
|
if name_filter and name_filter.lower() not in func_name.lower():
|
|
continue
|
|
if name_contains and name_contains.lower() not in func_name.lower():
|
|
continue
|
|
if name_regex_pat and not name_regex_pat.search(func_name):
|
|
continue
|
|
if skipped < offset:
|
|
skipped += 1
|
|
continue
|
|
addr = str(func.getEntryPoint())
|
|
item = {
|
|
"name": func_name,
|
|
"address": addr,
|
|
"signature": str(func.getSignature()),
|
|
"parameterCount": func.getParameterCount(),
|
|
"isThunk": func.isThunk(),
|
|
"_links": {
|
|
"self": make_link("/functions/%s" % addr),
|
|
"decompile": make_link("/functions/%s/decompile" % addr),
|
|
"disassembly": make_link("/functions/%s/disassembly" % addr),
|
|
},
|
|
}
|
|
if not grep_matches_item(item, grep_pattern):
|
|
continue
|
|
functions.append(item)
|
|
count += 1
|
|
|
|
result = {
|
|
"success": True,
|
|
"result": functions,
|
|
"size": total,
|
|
"offset": offset,
|
|
"limit": limit,
|
|
"_links": {
|
|
"self": make_link("/functions?offset=%d&limit=%d" % (offset, limit)),
|
|
},
|
|
}
|
|
if offset + count < total:
|
|
result["_links"]["next"] = make_link("/functions?offset=%d&limit=%d" % (offset + limit, limit))
|
|
if offset > 0:
|
|
result["_links"]["prev"] = make_link("/functions?offset=%d&limit=%d" % (max(0, offset - limit), limit))
|
|
return result
|
|
|
|
def handle_function_detail(self, exchange, addr_str):
|
|
if not self.program:
|
|
return self._no_program()
|
|
func = self._find_function_at(addr_str)
|
|
if not func:
|
|
return {"success": False, "error": {"code": "FUNCTION_NOT_FOUND", "message": "No function at address: %s" % addr_str}}
|
|
return {"success": True, "result": self._build_function_info(func)}
|
|
|
|
def handle_function_by_name(self, exchange, name):
|
|
if not self.program:
|
|
return self._no_program()
|
|
func = self._find_function_by_name(name)
|
|
if not func:
|
|
return {"success": False, "error": {"code": "FUNCTION_NOT_FOUND", "message": "Function not found: %s" % name}}
|
|
return {"success": True, "result": self._build_function_info(func)}
|
|
|
|
# -- Decompile --
|
|
|
|
def handle_decompile(self, exchange, addr_str):
|
|
if not self.program:
|
|
return self._no_program()
|
|
func = self._find_function_at(addr_str)
|
|
if not func:
|
|
return {"success": False, "error": {"code": "FUNCTION_NOT_FOUND", "message": "No function at address: %s" % addr_str}}
|
|
return self._decompile_function(func)
|
|
|
|
def handle_decompile_by_name(self, exchange, name):
|
|
if not self.program:
|
|
return self._no_program()
|
|
func = self._find_function_by_name(name)
|
|
if not func:
|
|
return {"success": False, "error": {"code": "FUNCTION_NOT_FOUND", "message": "Function not found: %s" % name}}
|
|
return self._decompile_function(func)
|
|
|
|
# -- Disassembly --
|
|
|
|
def handle_disassembly(self, exchange, addr_str):
|
|
if not self.program:
|
|
return self._no_program()
|
|
func = self._find_function_at(addr_str)
|
|
if not func:
|
|
return {"success": False, "error": {"code": "FUNCTION_NOT_FOUND", "message": "No function at address: %s" % addr_str}}
|
|
return self._build_disassembly(func)
|
|
|
|
def handle_disassembly_by_name(self, exchange, name):
|
|
if not self.program:
|
|
return self._no_program()
|
|
func = self._find_function_by_name(name)
|
|
if not func:
|
|
return {"success": False, "error": {"code": "FUNCTION_NOT_FOUND", "message": "Function not found: %s" % name}}
|
|
return self._build_disassembly(func)
|
|
|
|
# -- Variables --
|
|
|
|
def handle_function_variables(self, exchange, addr_str):
|
|
if not self.program:
|
|
return self._no_program()
|
|
func = self._find_function_at(addr_str)
|
|
if not func:
|
|
return {"success": False, "error": {"code": "FUNCTION_NOT_FOUND", "message": "No function at address: %s" % addr_str}}
|
|
|
|
variables = []
|
|
# Parameters
|
|
for param in func.getParameters():
|
|
variables.append({
|
|
"name": param.getName(),
|
|
"type": param.getDataType().getName(),
|
|
"storage": str(param.getVariableStorage()),
|
|
"kind": "parameter",
|
|
"ordinal": param.getOrdinal(),
|
|
})
|
|
# Local variables
|
|
for local in func.getLocalVariables():
|
|
var_info = {
|
|
"name": local.getName(),
|
|
"type": local.getDataType().getName(),
|
|
"storage": str(local.getVariableStorage()),
|
|
"kind": "local",
|
|
}
|
|
try:
|
|
so = local.getStackOffset()
|
|
if so != 0:
|
|
var_info["stackOffset"] = so
|
|
except:
|
|
pass
|
|
variables.append(var_info)
|
|
|
|
addr = str(func.getEntryPoint())
|
|
return {
|
|
"success": True,
|
|
"result": {
|
|
"name": func.getName(),
|
|
"address": addr,
|
|
"variables": variables,
|
|
"parameterCount": func.getParameterCount(),
|
|
"localVariableCount": len(func.getLocalVariables()),
|
|
}
|
|
}
|
|
|
|
# -- Patch (rename / comment) --
|
|
|
|
def _handle_patch_function_impl(self, exchange, identifier, by_name):
|
|
if not self.program:
|
|
return self._no_program()
|
|
func = self._get_function(identifier, by_name)
|
|
if not func:
|
|
return {"success": False, "error": {"code": "FUNCTION_NOT_FOUND", "message": "Function not found: %s" % identifier}}
|
|
|
|
body = parse_json_body(exchange)
|
|
new_name = body.get("name")
|
|
comment = body.get("comment")
|
|
|
|
if not new_name and comment is None:
|
|
return {"success": False, "error": {"code": "MISSING_PARAMETER", "message": "Provide 'name' and/or 'comment' in request body"}}
|
|
|
|
old_name = func.getName()
|
|
addr = str(func.getEntryPoint())
|
|
result = {"address": addr, "oldName": old_name}
|
|
|
|
def do_patch():
|
|
if new_name:
|
|
func.setName(new_name, SourceType.USER_DEFINED)
|
|
result["newName"] = new_name
|
|
if comment is not None:
|
|
func.setComment(comment)
|
|
result["comment"] = comment
|
|
|
|
with_transaction(self.program, "Patch function", do_patch)
|
|
result["message"] = "Function updated successfully"
|
|
return {"success": True, "result": result}
|
|
|
|
def handle_patch_function(self, exchange, addr_str):
|
|
return self._handle_patch_function_impl(exchange, addr_str, by_name=False)
|
|
|
|
def handle_patch_function_by_name(self, exchange, name):
|
|
return self._handle_patch_function_impl(exchange, name, by_name=True)
|
|
|
|
# -- Create function --
|
|
|
|
def handle_create_function(self, exchange):
|
|
if not self.program:
|
|
return self._no_program()
|
|
body = parse_json_body(exchange)
|
|
addr_str = body.get("address")
|
|
if not addr_str:
|
|
return {"success": False, "error": {"code": "MISSING_PARAMETER", "message": "Missing 'address' in request body"}}
|
|
|
|
try:
|
|
addr = self.program.getAddressFactory().getAddress(addr_str)
|
|
except:
|
|
return {"success": False, "error": {"code": "INVALID_ADDRESS", "message": "Invalid address: %s" % addr_str}}
|
|
|
|
result_holder = [None]
|
|
|
|
def do_create():
|
|
fm = self.program.getFunctionManager()
|
|
func = fm.createFunction(None, addr, None, SourceType.USER_DEFINED)
|
|
result_holder[0] = func
|
|
|
|
try:
|
|
with_transaction(self.program, "Create function", do_create)
|
|
func = result_holder[0]
|
|
if func:
|
|
return ({"success": True, "result": {
|
|
"name": func.getName(),
|
|
"address": str(func.getEntryPoint()),
|
|
"message": "Function created successfully",
|
|
}}, 201)
|
|
return {"success": False, "error": {"code": "CREATE_FAILED", "message": "Failed to create function at %s" % addr_str}}
|
|
except Exception as e:
|
|
return {"success": False, "error": {"code": "CREATE_ERROR", "message": str(e)}}
|
|
|
|
# -- Signature --
|
|
|
|
def _handle_signature_impl(self, exchange, identifier, by_name):
|
|
if not self.program:
|
|
return self._no_program()
|
|
func = self._get_function(identifier, by_name)
|
|
if not func:
|
|
return {"success": False, "error": {"code": "FUNCTION_NOT_FOUND", "message": "Function not found: %s" % identifier}}
|
|
|
|
body = parse_json_body(exchange)
|
|
sig_str = body.get("signature")
|
|
if not sig_str:
|
|
return {"success": False, "error": {"code": "MISSING_PARAMETER", "message": "Missing 'signature' in request body"}}
|
|
|
|
addr = str(func.getEntryPoint())
|
|
try:
|
|
from ghidra.app.util.cparser.C import CParser
|
|
dtm = self.program.getDataTypeManager()
|
|
parser = CParser(dtm)
|
|
parsed = parser.parse(sig_str + ";")
|
|
|
|
def do_set_sig():
|
|
from ghidra.program.model.data import FunctionDefinitionDataType
|
|
if isinstance(parsed, FunctionDefinitionDataType):
|
|
from ghidra.app.cmd.function import ApplyFunctionSignatureCmd
|
|
cmd = ApplyFunctionSignatureCmd(func.getEntryPoint(), parsed, SourceType.USER_DEFINED)
|
|
cmd.applyTo(self.program, getMonitor())
|
|
|
|
with_transaction(self.program, "Set function signature", do_set_sig)
|
|
return {"success": True, "result": {
|
|
"address": addr,
|
|
"signature": sig_str,
|
|
"message": "Signature updated",
|
|
}}
|
|
except Exception as e:
|
|
# Fallback: try just setting name from signature
|
|
return {"success": False, "error": {"code": "SIGNATURE_ERROR", "message": "Could not parse signature: %s" % str(e)}}
|
|
|
|
def handle_signature(self, exchange, addr_str):
|
|
return self._handle_signature_impl(exchange, addr_str, by_name=False)
|
|
|
|
def handle_signature_by_name(self, exchange, name):
|
|
return self._handle_signature_impl(exchange, name, by_name=True)
|
|
|
|
# ==================================================================
|
|
# Data Handlers
|
|
# ==================================================================
|
|
|
|
def handle_data_list(self, exchange):
|
|
if not self.program:
|
|
return self._no_program()
|
|
params = parse_query_params(exchange)
|
|
limit = parse_int(params.get("limit"), 100)
|
|
offset = parse_int(params.get("offset"), 0)
|
|
addr_filter = params.get("addr")
|
|
name_filter = params.get("name")
|
|
name_contains = params.get("name_contains")
|
|
type_filter = params.get("type")
|
|
grep_pattern = compile_grep(params)
|
|
|
|
# Single address lookup
|
|
if addr_filter:
|
|
try:
|
|
addr = self.program.getAddressFactory().getAddress(addr_filter)
|
|
data = self.program.getListing().getDataAt(addr)
|
|
if data:
|
|
value = data.getDefaultValueRepresentation()
|
|
if value and len(value) > 200:
|
|
value = value[:200] + "..."
|
|
item = {
|
|
"address": str(data.getAddress()),
|
|
"type": data.getDataType().getName(),
|
|
"length": data.getLength(),
|
|
"value": value,
|
|
}
|
|
sym = self.program.getSymbolTable().getPrimarySymbol(addr)
|
|
if sym:
|
|
item["name"] = sym.getName()
|
|
return {"success": True, "result": [item], "size": 1, "offset": 0, "limit": 1}
|
|
return {"success": True, "result": [], "size": 0, "offset": 0, "limit": limit}
|
|
except:
|
|
return {"success": True, "result": [], "size": 0, "offset": 0, "limit": limit}
|
|
|
|
data_items = []
|
|
listing = self.program.getListing()
|
|
st = self.program.getSymbolTable()
|
|
count = 0
|
|
skipped = 0
|
|
|
|
for data in listing.getDefinedData(True):
|
|
if count >= limit:
|
|
break
|
|
# Type filter
|
|
if type_filter and type_filter.lower() not in data.getDataType().getName().lower():
|
|
continue
|
|
# Name filters (require symbol lookup)
|
|
if name_filter or name_contains:
|
|
sym = st.getPrimarySymbol(data.getAddress())
|
|
sym_name = sym.getName() if sym else ""
|
|
if name_filter and sym_name != name_filter:
|
|
continue
|
|
if name_contains and name_contains.lower() not in sym_name.lower():
|
|
continue
|
|
if skipped < offset:
|
|
skipped += 1
|
|
continue
|
|
value = data.getDefaultValueRepresentation()
|
|
if value and len(value) > 200:
|
|
value = value[:200] + "..."
|
|
item = {
|
|
"address": str(data.getAddress()),
|
|
"type": data.getDataType().getName(),
|
|
"length": data.getLength(),
|
|
"value": value,
|
|
}
|
|
sym = st.getPrimarySymbol(data.getAddress())
|
|
if sym:
|
|
item["name"] = sym.getName()
|
|
item["_links"] = {"self": make_link("/data/%s" % str(data.getAddress()))}
|
|
if not grep_matches_item(item, grep_pattern):
|
|
continue
|
|
data_items.append(item)
|
|
count += 1
|
|
|
|
return {
|
|
"success": True,
|
|
"result": data_items,
|
|
"offset": offset,
|
|
"limit": limit,
|
|
}
|
|
|
|
def handle_data_create(self, exchange):
|
|
if not self.program:
|
|
return self._no_program()
|
|
body = parse_json_body(exchange)
|
|
addr_str = body.get("address")
|
|
if not addr_str:
|
|
return {"success": False, "error": {"code": "MISSING_PARAMETER", "message": "Missing 'address'"}}
|
|
|
|
try:
|
|
addr = self.program.getAddressFactory().getAddress(addr_str)
|
|
except:
|
|
return {"success": False, "error": {"code": "INVALID_ADDRESS", "message": "Invalid address: %s" % addr_str}}
|
|
|
|
# Label creation (newName field)
|
|
new_name = body.get("newName")
|
|
if new_name:
|
|
def do_label():
|
|
self.program.getSymbolTable().createLabel(addr, new_name, SourceType.USER_DEFINED)
|
|
try:
|
|
with_transaction(self.program, "Create label", do_label)
|
|
return {"success": True, "result": {"address": addr_str, "name": new_name, "message": "Label created"}}
|
|
except Exception as e:
|
|
return {"success": False, "error": {"code": "LABEL_ERROR", "message": str(e)}}
|
|
|
|
# Data creation (type field)
|
|
type_name = body.get("type")
|
|
if not type_name:
|
|
return {"success": False, "error": {"code": "MISSING_PARAMETER", "message": "Missing 'type' or 'newName'"}}
|
|
|
|
dtm = self.program.getDataTypeManager()
|
|
dt = resolve_data_type(dtm, type_name)
|
|
if not dt:
|
|
return {"success": False, "error": {"code": "UNKNOWN_TYPE", "message": "Cannot resolve type: %s" % type_name}}
|
|
|
|
def do_create_data():
|
|
from ghidra.program.model.data import DataUtilities
|
|
DataUtilities.createData(
|
|
self.program, addr, dt, -1, False,
|
|
DataUtilities.ClearDataMode.CLEAR_ALL_UNDEFINED_CONFLICT_DATA
|
|
)
|
|
|
|
try:
|
|
with_transaction(self.program, "Create data", do_create_data)
|
|
return ({"success": True, "result": {"address": addr_str, "type": type_name, "message": "Data created"}}, 201)
|
|
except Exception as e:
|
|
return {"success": False, "error": {"code": "DATA_ERROR", "message": str(e)}}
|
|
|
|
def handle_data_delete(self, exchange):
|
|
if not self.program:
|
|
return self._no_program()
|
|
body = parse_json_body(exchange)
|
|
addr_str = body.get("address")
|
|
if not addr_str:
|
|
return {"success": False, "error": {"code": "MISSING_PARAMETER", "message": "Missing 'address'"}}
|
|
|
|
try:
|
|
addr = self.program.getAddressFactory().getAddress(addr_str)
|
|
except:
|
|
return {"success": False, "error": {"code": "INVALID_ADDRESS", "message": "Invalid address: %s" % addr_str}}
|
|
|
|
def do_delete():
|
|
self.program.getListing().clearCodeUnits(addr, addr, False)
|
|
|
|
try:
|
|
with_transaction(self.program, "Delete data", do_delete)
|
|
return {"success": True, "result": {"address": addr_str, "message": "Data deleted"}}
|
|
except Exception as e:
|
|
return {"success": False, "error": {"code": "DELETE_ERROR", "message": str(e)}}
|
|
|
|
def handle_data_type(self, exchange):
|
|
"""Change data type at an address (clear + recreate)."""
|
|
if not self.program:
|
|
return self._no_program()
|
|
body = parse_json_body(exchange)
|
|
addr_str = body.get("address")
|
|
type_name = body.get("type")
|
|
if not addr_str or not type_name:
|
|
return {"success": False, "error": {"code": "MISSING_PARAMETER", "message": "Missing 'address' and/or 'type'"}}
|
|
|
|
try:
|
|
addr = self.program.getAddressFactory().getAddress(addr_str)
|
|
except:
|
|
return {"success": False, "error": {"code": "INVALID_ADDRESS", "message": "Invalid address: %s" % addr_str}}
|
|
|
|
dtm = self.program.getDataTypeManager()
|
|
dt = resolve_data_type(dtm, type_name)
|
|
if not dt:
|
|
return {"success": False, "error": {"code": "UNKNOWN_TYPE", "message": "Cannot resolve type: %s" % type_name}}
|
|
|
|
def do_retype():
|
|
from ghidra.program.model.data import DataUtilities
|
|
self.program.getListing().clearCodeUnits(addr, addr, False)
|
|
DataUtilities.createData(
|
|
self.program, addr, dt, -1, False,
|
|
DataUtilities.ClearDataMode.CLEAR_ALL_UNDEFINED_CONFLICT_DATA
|
|
)
|
|
|
|
try:
|
|
with_transaction(self.program, "Change data type", do_retype)
|
|
return {"success": True, "result": {"address": addr_str, "type": type_name, "message": "Data type changed"}}
|
|
except Exception as e:
|
|
return {"success": False, "error": {"code": "TYPE_ERROR", "message": str(e)}}
|
|
|
|
# ==================================================================
|
|
# Strings Handler
|
|
# ==================================================================
|
|
|
|
def handle_strings(self, exchange):
|
|
if not self.program:
|
|
return self._no_program()
|
|
params = parse_query_params(exchange)
|
|
limit = parse_int(params.get("limit"), 2000)
|
|
offset = parse_int(params.get("offset"), 0)
|
|
filter_str = params.get("filter")
|
|
min_length = parse_int(params.get("min_length"), 2)
|
|
grep_pattern = compile_grep(params)
|
|
|
|
strings = []
|
|
listing = self.program.getListing()
|
|
ref_mgr = self.program.getReferenceManager()
|
|
count = 0
|
|
skipped = 0
|
|
|
|
for data in listing.getDefinedData(True):
|
|
if count >= limit:
|
|
break
|
|
try:
|
|
dt = data.getDataType()
|
|
if not dt:
|
|
continue
|
|
type_name = dt.getName().lower()
|
|
if "string" not in type_name and type_name not in ("char", "wchar"):
|
|
continue
|
|
value = data.getValue()
|
|
if not value:
|
|
continue
|
|
str_val = str(value)
|
|
if len(str_val) < min_length:
|
|
continue
|
|
if filter_str and filter_str.lower() not in str_val.lower():
|
|
continue
|
|
if skipped < offset:
|
|
skipped += 1
|
|
continue
|
|
item = {
|
|
"address": str(data.getAddress()),
|
|
"value": str_val[:200],
|
|
"length": len(str_val),
|
|
"type": dt.getName(),
|
|
}
|
|
# Xref count
|
|
ref_count = 0
|
|
refs = ref_mgr.getReferencesTo(data.getAddress())
|
|
while refs.hasNext():
|
|
refs.next()
|
|
ref_count += 1
|
|
item["xrefCount"] = ref_count
|
|
# Label name
|
|
sym = self.program.getSymbolTable().getPrimarySymbol(data.getAddress())
|
|
if sym:
|
|
item["name"] = sym.getName()
|
|
if not grep_matches_item(item, grep_pattern):
|
|
continue
|
|
strings.append(item)
|
|
count += 1
|
|
except:
|
|
pass
|
|
|
|
return {
|
|
"success": True,
|
|
"result": strings,
|
|
"size": len(strings),
|
|
"offset": offset,
|
|
"limit": limit,
|
|
}
|
|
|
|
# ==================================================================
|
|
# Memory Handlers
|
|
# ==================================================================
|
|
|
|
def handle_memory_read(self, exchange):
|
|
if not self.program:
|
|
return self._no_program()
|
|
params = parse_query_params(exchange)
|
|
addr_str = params.get("address")
|
|
if not addr_str:
|
|
return {"success": False, "error": {"code": "MISSING_PARAMETER", "message": "Missing 'address' parameter"}}
|
|
|
|
length = parse_int(params.get("length"), 16)
|
|
fmt = params.get("format", "hex")
|
|
if length <= 0 or length > 4096:
|
|
return {"success": False, "error": {"code": "INVALID_PARAMETER", "message": "Length must be 1-4096"}}
|
|
|
|
try:
|
|
addr = self.program.getAddressFactory().getAddress(addr_str)
|
|
mem = self.program.getMemory()
|
|
from jarray import zeros
|
|
byte_array = zeros(length, 'b')
|
|
bytes_read = mem.getBytes(addr, byte_array)
|
|
|
|
if fmt == "base64":
|
|
from java.util import Base64
|
|
# Convert to unsigned bytes for Base64
|
|
import array as pyarray
|
|
unsigned = bytearray([(b & 0xff) for b in byte_array[:bytes_read]])
|
|
formatted = Base64.getEncoder().encodeToString(unsigned)
|
|
elif fmt == "string":
|
|
chars = []
|
|
for i in range(bytes_read):
|
|
b = byte_array[i] & 0xff
|
|
if 32 <= b < 127:
|
|
chars.append(chr(b))
|
|
else:
|
|
chars.append('.')
|
|
formatted = ''.join(chars)
|
|
else:
|
|
formatted = bytes_to_hex(byte_array[:bytes_read])
|
|
|
|
return {
|
|
"success": True,
|
|
"result": {
|
|
"address": str(addr),
|
|
"bytesRead": bytes_read,
|
|
"format": fmt,
|
|
"bytes": formatted,
|
|
"hexBytes": bytes_to_hex(byte_array[:bytes_read]),
|
|
"rawBytes": bytes_to_hex(byte_array[:bytes_read]),
|
|
}
|
|
}
|
|
except Exception as e:
|
|
return {"success": False, "error": {"code": "MEMORY_ERROR", "message": str(e)}}
|
|
|
|
def handle_memory_write(self, exchange, addr_str):
|
|
if not self.program:
|
|
return self._no_program()
|
|
body = parse_json_body(exchange)
|
|
bytes_str = body.get("bytes")
|
|
fmt = body.get("format", "hex")
|
|
|
|
if not bytes_str:
|
|
return {"success": False, "error": {"code": "MISSING_PARAMETER", "message": "Missing 'bytes'"}}
|
|
|
|
try:
|
|
addr = self.program.getAddressFactory().getAddress(addr_str)
|
|
if fmt == "base64":
|
|
from java.util import Base64
|
|
byte_vals = list(Base64.getDecoder().decode(bytes_str))
|
|
elif fmt == "string":
|
|
byte_vals = [ord(c) for c in bytes_str]
|
|
else:
|
|
byte_vals = hex_to_bytes(bytes_str)
|
|
|
|
# Convert to Java byte array
|
|
from jarray import zeros
|
|
java_bytes = zeros(len(byte_vals), 'b')
|
|
for i, b in enumerate(byte_vals):
|
|
java_bytes[i] = b if b <= 127 else b - 256
|
|
|
|
def do_write():
|
|
self.program.getMemory().setBytes(addr, java_bytes)
|
|
|
|
with_transaction(self.program, "Write memory", do_write)
|
|
return {"success": True, "result": {"address": str(addr), "bytesWritten": len(byte_vals)}}
|
|
except Exception as e:
|
|
return {"success": False, "error": {"code": "MEMORY_ERROR", "message": str(e)}}
|
|
|
|
def handle_memory_blocks(self, exchange):
|
|
"""List memory blocks (alias for /segments)."""
|
|
return self.handle_segments(exchange)
|
|
|
|
def handle_get_comment(self, exchange, addr_str, comment_type):
|
|
"""Get a comment at a specific address."""
|
|
if not self.program:
|
|
return self._no_program()
|
|
|
|
ct = COMMENT_TYPE_MAP.get(comment_type.lower())
|
|
if ct is None:
|
|
return {"success": False, "error": {
|
|
"code": "INVALID_COMMENT_TYPE",
|
|
"message": "Invalid comment type: %s. Use: pre, post, eol, plate, repeatable" % comment_type
|
|
}}
|
|
|
|
try:
|
|
addr = self.program.getAddressFactory().getAddress(addr_str)
|
|
listing = self.program.getListing()
|
|
cu = listing.getCodeUnitAt(addr)
|
|
if not cu:
|
|
cu = listing.getCodeUnitContaining(addr)
|
|
if not cu:
|
|
return {"success": True, "result": {"address": addr_str, "commentType": comment_type, "comment": None}}
|
|
|
|
comment = cu.getComment(ct)
|
|
return {"success": True, "result": {
|
|
"address": addr_str,
|
|
"commentType": comment_type,
|
|
"comment": comment,
|
|
}}
|
|
except Exception as e:
|
|
return {"success": False, "error": {"code": "COMMENT_ERROR", "message": str(e)}}
|
|
|
|
def handle_set_comment(self, exchange, addr_str, comment_type):
|
|
"""Set a comment at a specific address."""
|
|
if not self.program:
|
|
return self._no_program()
|
|
body = parse_json_body(exchange)
|
|
comment_text = body.get("comment", "")
|
|
|
|
ct = COMMENT_TYPE_MAP.get(comment_type.lower())
|
|
if ct is None:
|
|
return {"success": False, "error": {
|
|
"code": "INVALID_COMMENT_TYPE",
|
|
"message": "Invalid comment type: %s. Use: pre, post, eol, plate, repeatable" % comment_type
|
|
}}
|
|
|
|
try:
|
|
addr = self.program.getAddressFactory().getAddress(addr_str)
|
|
listing = self.program.getListing()
|
|
cu = listing.getCodeUnitAt(addr)
|
|
if not cu:
|
|
cu = listing.getCodeUnitContaining(addr)
|
|
if not cu:
|
|
return {"success": False, "error": {"code": "NO_CODE_UNIT", "message": "No code unit at address: %s" % addr_str}}
|
|
|
|
def do_comment():
|
|
cu.setComment(ct, comment_text if comment_text else None)
|
|
|
|
with_transaction(self.program, "Set comment", do_comment)
|
|
return {"success": True, "result": {
|
|
"address": addr_str,
|
|
"commentType": comment_type,
|
|
"comment": comment_text,
|
|
"message": "Comment set successfully",
|
|
}}
|
|
except Exception as e:
|
|
return {"success": False, "error": {"code": "COMMENT_ERROR", "message": str(e)}}
|
|
|
|
# ==================================================================
|
|
# Segments Handler
|
|
# ==================================================================
|
|
|
|
def handle_segments(self, exchange):
|
|
if not self.program:
|
|
return self._no_program()
|
|
params = parse_query_params(exchange)
|
|
name_filter = params.get("name")
|
|
|
|
segments = []
|
|
for block in self.program.getMemory().getBlocks():
|
|
if name_filter and name_filter not in block.getName():
|
|
continue
|
|
segments.append({
|
|
"name": block.getName(),
|
|
"start": str(block.getStart()),
|
|
"end": str(block.getEnd()),
|
|
"size": block.getSize(),
|
|
"readable": block.isRead(),
|
|
"writable": block.isWrite(),
|
|
"executable": block.isExecute(),
|
|
"initialized": block.isInitialized(),
|
|
"_links": {
|
|
"self": make_link("/segments/%s" % block.getName()),
|
|
"memory": make_link("/memory?address=%s&length=1024" % str(block.getStart())),
|
|
},
|
|
})
|
|
|
|
return {"success": True, "result": segments}
|
|
|
|
# ==================================================================
|
|
# Symbol Handlers
|
|
# ==================================================================
|
|
|
|
def handle_symbols(self, exchange):
|
|
if not self.program:
|
|
return self._no_program()
|
|
params = parse_query_params(exchange)
|
|
limit = parse_int(params.get("limit"), 100)
|
|
offset = parse_int(params.get("offset"), 0)
|
|
name_filter = params.get("name")
|
|
type_filter = params.get("type")
|
|
grep_pattern = compile_grep(params)
|
|
|
|
symbols = []
|
|
st = self.program.getSymbolTable()
|
|
count = 0
|
|
skipped = 0
|
|
|
|
for symbol in st.getAllSymbols(True):
|
|
if count >= limit:
|
|
break
|
|
if name_filter and name_filter.lower() not in symbol.getName().lower():
|
|
continue
|
|
if type_filter and str(symbol.getSymbolType()).lower() != type_filter.lower():
|
|
continue
|
|
if skipped < offset:
|
|
skipped += 1
|
|
continue
|
|
item = {
|
|
"name": symbol.getName(),
|
|
"address": str(symbol.getAddress()),
|
|
"namespace": symbol.getParentNamespace().getName(),
|
|
"type": str(symbol.getSymbolType()),
|
|
"isPrimary": symbol.isPrimary(),
|
|
"isExternal": symbol.isExternal(),
|
|
}
|
|
if not grep_matches_item(item, grep_pattern):
|
|
continue
|
|
symbols.append(item)
|
|
count += 1
|
|
|
|
return {
|
|
"success": True,
|
|
"result": symbols,
|
|
"offset": offset,
|
|
"limit": limit,
|
|
"_links": {
|
|
"self": make_link("/symbols"),
|
|
"imports": make_link("/symbols/imports"),
|
|
"exports": make_link("/symbols/exports"),
|
|
},
|
|
}
|
|
|
|
def handle_imports(self, exchange):
|
|
if not self.program:
|
|
return self._no_program()
|
|
params = parse_query_params(exchange)
|
|
limit = parse_int(params.get("limit"), 100)
|
|
offset = parse_int(params.get("offset"), 0)
|
|
grep_pattern = compile_grep(params)
|
|
|
|
imports = []
|
|
count = 0
|
|
skipped = 0
|
|
for symbol in self.program.getSymbolTable().getExternalSymbols():
|
|
if count >= limit:
|
|
break
|
|
if skipped < offset:
|
|
skipped += 1
|
|
continue
|
|
item = {
|
|
"name": symbol.getName(),
|
|
"address": str(symbol.getAddress()),
|
|
"namespace": symbol.getParentNamespace().getName(),
|
|
}
|
|
if not grep_matches_item(item, grep_pattern):
|
|
continue
|
|
imports.append(item)
|
|
count += 1
|
|
|
|
return {"success": True, "result": imports, "offset": offset, "limit": limit}
|
|
|
|
def handle_exports(self, exchange):
|
|
if not self.program:
|
|
return self._no_program()
|
|
params = parse_query_params(exchange)
|
|
limit = parse_int(params.get("limit"), 100)
|
|
offset = parse_int(params.get("offset"), 0)
|
|
grep_pattern = compile_grep(params)
|
|
|
|
exports = []
|
|
count = 0
|
|
skipped = 0
|
|
for symbol in self.program.getSymbolTable().getAllSymbols(True):
|
|
if count >= limit:
|
|
break
|
|
if not symbol.isExternalEntryPoint():
|
|
continue
|
|
if skipped < offset:
|
|
skipped += 1
|
|
continue
|
|
item = {
|
|
"name": symbol.getName(),
|
|
"address": str(symbol.getAddress()),
|
|
}
|
|
if not grep_matches_item(item, grep_pattern):
|
|
continue
|
|
exports.append(item)
|
|
count += 1
|
|
|
|
return {"success": True, "result": exports, "offset": offset, "limit": limit}
|
|
|
|
# ==================================================================
|
|
# Cross-References Handler
|
|
# ==================================================================
|
|
|
|
def handle_xrefs(self, exchange):
|
|
if not self.program:
|
|
return self._no_program()
|
|
params = parse_query_params(exchange)
|
|
to_addr_str = params.get("to_addr")
|
|
from_addr_str = params.get("from_addr")
|
|
type_filter = params.get("type")
|
|
limit = parse_int(params.get("limit"), 100)
|
|
offset = parse_int(params.get("offset"), 0)
|
|
grep_pattern = compile_grep(params)
|
|
|
|
if not to_addr_str and not from_addr_str:
|
|
return {"success": False, "error": {
|
|
"code": "MISSING_PARAMETER",
|
|
"message": "Either 'to_addr' or 'from_addr' parameter is required"
|
|
}}
|
|
|
|
xrefs = []
|
|
ref_mgr = self.program.getReferenceManager()
|
|
|
|
try:
|
|
if to_addr_str:
|
|
addr = self.program.getAddressFactory().getAddress(to_addr_str)
|
|
refs = ref_mgr.getReferencesTo(addr)
|
|
count = 0
|
|
skipped = 0
|
|
while refs.hasNext() and count < limit:
|
|
ref = refs.next()
|
|
if type_filter and self._get_ref_type_name(ref.getReferenceType()).lower() != type_filter.lower():
|
|
continue
|
|
if skipped < offset:
|
|
skipped += 1
|
|
continue
|
|
item = self._build_xref_info(ref)
|
|
if not grep_matches_item(item, grep_pattern):
|
|
continue
|
|
xrefs.append(item)
|
|
count += 1
|
|
|
|
if from_addr_str:
|
|
addr = self.program.getAddressFactory().getAddress(from_addr_str)
|
|
refs = ref_mgr.getReferencesFrom(addr)
|
|
count = 0
|
|
skipped = 0
|
|
for ref in refs:
|
|
if count >= limit:
|
|
break
|
|
if type_filter and self._get_ref_type_name(ref.getReferenceType()).lower() != type_filter.lower():
|
|
continue
|
|
if skipped < offset:
|
|
skipped += 1
|
|
continue
|
|
item = self._build_xref_info(ref)
|
|
if not grep_matches_item(item, grep_pattern):
|
|
continue
|
|
xrefs.append(item)
|
|
count += 1
|
|
|
|
return {"success": True, "result": xrefs, "offset": offset, "limit": limit}
|
|
except Exception as e:
|
|
return {"success": False, "error": {"code": "XREF_ERROR", "message": str(e)}}
|
|
|
|
# ==================================================================
|
|
# Classes & Namespaces Handlers
|
|
# ==================================================================
|
|
|
|
def handle_classes(self, exchange):
|
|
if not self.program:
|
|
return self._no_program()
|
|
params = parse_query_params(exchange)
|
|
limit = parse_int(params.get("limit"), 100)
|
|
offset = parse_int(params.get("offset"), 0)
|
|
|
|
class_names = set()
|
|
for symbol in self.program.getSymbolTable().getAllSymbols(True):
|
|
ns = symbol.getParentNamespace()
|
|
if ns and not ns.isGlobal() and ns.getSymbol().getSymbolType().isNamespace():
|
|
class_names.add(ns.getName(True))
|
|
|
|
sorted_names = sorted(class_names)
|
|
start = min(offset, len(sorted_names))
|
|
end = min(offset + limit, len(sorted_names))
|
|
|
|
classes = []
|
|
for name in sorted_names[start:end]:
|
|
info = {"name": name}
|
|
if "::" in name:
|
|
info["namespace"] = name[:name.rfind("::")]
|
|
info["simpleName"] = name[name.rfind("::") + 2:]
|
|
else:
|
|
info["namespace"] = "global"
|
|
info["simpleName"] = name
|
|
classes.append(info)
|
|
|
|
return {
|
|
"success": True,
|
|
"result": classes,
|
|
"size": len(sorted_names),
|
|
"offset": offset,
|
|
"limit": limit,
|
|
}
|
|
|
|
def handle_namespaces(self, exchange):
|
|
if not self.program:
|
|
return self._no_program()
|
|
params = parse_query_params(exchange)
|
|
limit = parse_int(params.get("limit"), 100)
|
|
offset = parse_int(params.get("offset"), 0)
|
|
|
|
namespaces = set()
|
|
for symbol in self.program.getSymbolTable().getAllSymbols(True):
|
|
ns = symbol.getParentNamespace()
|
|
if ns and not ns.isGlobal():
|
|
namespaces.add(ns.getName(True))
|
|
|
|
sorted_ns = sorted(namespaces)
|
|
start = min(offset, len(sorted_ns))
|
|
end = min(offset + limit, len(sorted_ns))
|
|
|
|
return {
|
|
"success": True,
|
|
"result": sorted_ns[start:end],
|
|
"size": len(sorted_ns),
|
|
"offset": offset,
|
|
"limit": limit,
|
|
}
|
|
|
|
# ==================================================================
|
|
# Structs Handlers
|
|
# ==================================================================
|
|
|
|
def handle_structs(self, exchange):
|
|
if not self.program:
|
|
return self._no_program()
|
|
params = parse_query_params(exchange)
|
|
|
|
# Detail lookup by name
|
|
name_lookup = params.get("name")
|
|
if name_lookup:
|
|
return self._handle_struct_detail(name_lookup)
|
|
|
|
# List structs
|
|
limit = parse_int(params.get("limit"), 100)
|
|
offset = parse_int(params.get("offset"), 0)
|
|
category_filter = params.get("category")
|
|
grep_pattern = compile_grep(params)
|
|
|
|
from ghidra.program.model.data import Structure, Union
|
|
|
|
structs = []
|
|
dtm = self.program.getDataTypeManager()
|
|
count = 0
|
|
skipped = 0
|
|
|
|
it = dtm.getAllDataTypes()
|
|
while it.hasNext() and count < limit:
|
|
dt = it.next()
|
|
if not isinstance(dt, (Structure, Union)):
|
|
continue
|
|
if category_filter and category_filter.lower() not in dt.getCategoryPath().getPath().lower():
|
|
continue
|
|
if skipped < offset:
|
|
skipped += 1
|
|
continue
|
|
item = {
|
|
"name": dt.getName(),
|
|
"category": dt.getCategoryPath().getPath(),
|
|
"path": dt.getPathName(),
|
|
"size": dt.getLength(),
|
|
"type": "struct" if isinstance(dt, Structure) else "union",
|
|
"numFields": dt.getNumComponents(),
|
|
"_links": {"self": make_link("/structs?name=%s" % dt.getName())},
|
|
}
|
|
if not grep_matches_item(item, grep_pattern):
|
|
continue
|
|
structs.append(item)
|
|
count += 1
|
|
|
|
return {"success": True, "result": structs, "offset": offset, "limit": limit}
|
|
|
|
def _handle_struct_detail(self, name):
|
|
from ghidra.program.model.data import Composite
|
|
dt = self._find_struct(name)
|
|
if not dt:
|
|
return {"success": False, "error": {"code": "STRUCT_NOT_FOUND", "message": "Struct not found: %s" % name}}
|
|
|
|
result = {
|
|
"name": dt.getName(),
|
|
"category": dt.getCategoryPath().getPath(),
|
|
"path": dt.getPathName(),
|
|
"size": dt.getLength(),
|
|
"description": dt.getDescription() or "",
|
|
}
|
|
|
|
if isinstance(dt, Composite):
|
|
fields = []
|
|
for comp in dt.getComponents():
|
|
field = {
|
|
"name": comp.getFieldName(),
|
|
"type": comp.getDataType().getName(),
|
|
"typePath": comp.getDataType().getPathName(),
|
|
"offset": comp.getOffset(),
|
|
"length": comp.getLength(),
|
|
}
|
|
if comp.getComment():
|
|
field["comment"] = comp.getComment()
|
|
fields.append(field)
|
|
result["fields"] = fields
|
|
result["numFields"] = len(fields)
|
|
|
|
result["_links"] = {
|
|
"self": make_link("/structs?name=%s" % name),
|
|
"structs": make_link("/structs"),
|
|
}
|
|
return {"success": True, "result": result}
|
|
|
|
def handle_struct_create(self, exchange):
|
|
if not self.program:
|
|
return self._no_program()
|
|
body = parse_json_body(exchange)
|
|
name = body.get("name")
|
|
if not name:
|
|
return {"success": False, "error": {"code": "MISSING_PARAMETER", "message": "Missing 'name'"}}
|
|
|
|
category = body.get("category", "/")
|
|
description = body.get("description", "")
|
|
|
|
try:
|
|
from ghidra.program.model.data import StructureDataType, CategoryPath, DataTypeConflictHandler
|
|
cat_path = CategoryPath(category)
|
|
new_struct = StructureDataType(cat_path, name, 0)
|
|
if description:
|
|
new_struct.setDescription(description)
|
|
|
|
result_holder = [None]
|
|
|
|
def do_create():
|
|
dtm = self.program.getDataTypeManager()
|
|
result_holder[0] = dtm.addDataType(new_struct, DataTypeConflictHandler.DEFAULT_HANDLER)
|
|
|
|
with_transaction(self.program, "Create struct", do_create)
|
|
dt = result_holder[0]
|
|
return ({"success": True, "result": {
|
|
"name": dt.getName() if dt else name,
|
|
"path": dt.getPathName() if dt else "%s/%s" % (category, name),
|
|
"category": category,
|
|
"size": 0,
|
|
"message": "Struct created successfully",
|
|
}}, 201)
|
|
except Exception as e:
|
|
return {"success": False, "error": {"code": "STRUCT_ERROR", "message": str(e)}}
|
|
|
|
def handle_struct_addfield(self, exchange):
|
|
if not self.program:
|
|
return self._no_program()
|
|
body = parse_json_body(exchange)
|
|
struct_name = body.get("struct")
|
|
field_name = body.get("fieldName")
|
|
field_type = body.get("fieldType")
|
|
|
|
if not struct_name or not field_name or not field_type:
|
|
return {"success": False, "error": {"code": "MISSING_PARAMETER", "message": "Missing 'struct', 'fieldName', or 'fieldType'"}}
|
|
|
|
struct = self._find_struct(struct_name)
|
|
if not struct:
|
|
return {"success": False, "error": {"code": "STRUCT_NOT_FOUND", "message": "Struct not found: %s" % struct_name}}
|
|
|
|
dtm = self.program.getDataTypeManager()
|
|
dt = resolve_data_type(dtm, field_type)
|
|
if not dt:
|
|
return {"success": False, "error": {"code": "UNKNOWN_TYPE", "message": "Cannot resolve type: %s" % field_type}}
|
|
|
|
field_offset = body.get("offset")
|
|
comment = body.get("comment")
|
|
result_info = {}
|
|
|
|
def do_add():
|
|
if field_offset is not None:
|
|
struct.insertAtOffset(int(field_offset), dt, dt.getLength(), field_name, comment)
|
|
result_info["offset"] = int(field_offset)
|
|
else:
|
|
struct.add(dt, dt.getLength(), field_name, comment)
|
|
result_info["offset"] = struct.getLength() - dt.getLength()
|
|
result_info["length"] = dt.getLength()
|
|
result_info["structSize"] = struct.getLength()
|
|
|
|
try:
|
|
with_transaction(self.program, "Add struct field", do_add)
|
|
return {"success": True, "result": {
|
|
"struct": struct_name,
|
|
"fieldName": field_name,
|
|
"fieldType": field_type,
|
|
"offset": result_info.get("offset", 0),
|
|
"length": result_info.get("length", 0),
|
|
"structSize": result_info.get("structSize", 0),
|
|
"message": "Field added successfully",
|
|
}}
|
|
except Exception as e:
|
|
return {"success": False, "error": {"code": "FIELD_ERROR", "message": str(e)}}
|
|
|
|
def handle_struct_updatefield(self, exchange):
|
|
if not self.program:
|
|
return self._no_program()
|
|
body = parse_json_body(exchange)
|
|
struct_name = body.get("struct")
|
|
if not struct_name:
|
|
return {"success": False, "error": {"code": "MISSING_PARAMETER", "message": "Missing 'struct'"}}
|
|
|
|
struct = self._find_struct(struct_name)
|
|
if not struct:
|
|
return {"success": False, "error": {"code": "STRUCT_NOT_FOUND", "message": "Struct not found: %s" % struct_name}}
|
|
|
|
field_name = body.get("fieldName")
|
|
field_offset = body.get("fieldOffset")
|
|
new_name = body.get("newName")
|
|
new_type = body.get("newType")
|
|
new_comment = body.get("newComment")
|
|
|
|
if not new_name and not new_type and new_comment is None:
|
|
return {"success": False, "error": {"code": "MISSING_PARAMETER", "message": "Provide at least one of: newName, newType, newComment"}}
|
|
|
|
# Find the component
|
|
component = None
|
|
if field_offset is not None:
|
|
component = struct.getComponentAt(int(field_offset))
|
|
elif field_name:
|
|
for comp in struct.getComponents():
|
|
if comp.getFieldName() == field_name:
|
|
component = comp
|
|
break
|
|
|
|
if not component:
|
|
return {"success": False, "error": {"code": "FIELD_NOT_FOUND", "message": "Field not found"}}
|
|
|
|
orig_name = component.getFieldName()
|
|
orig_type = component.getDataType().getName()
|
|
orig_comment = component.getComment()
|
|
|
|
def do_update():
|
|
if new_name:
|
|
component.setFieldName(new_name)
|
|
if new_comment is not None:
|
|
component.setComment(new_comment)
|
|
if new_type:
|
|
dtm = self.program.getDataTypeManager()
|
|
dt = resolve_data_type(dtm, new_type)
|
|
if dt:
|
|
component.setDataType(dt)
|
|
|
|
try:
|
|
with_transaction(self.program, "Update struct field", do_update)
|
|
return {"success": True, "result": {
|
|
"struct": struct_name,
|
|
"offset": component.getOffset(),
|
|
"originalName": orig_name,
|
|
"originalType": orig_type,
|
|
"originalComment": orig_comment,
|
|
"newName": new_name or orig_name,
|
|
"newType": new_type or orig_type,
|
|
"newComment": new_comment if new_comment is not None else orig_comment,
|
|
"length": component.getLength(),
|
|
"message": "Field updated successfully",
|
|
}}
|
|
except Exception as e:
|
|
return {"success": False, "error": {"code": "UPDATE_ERROR", "message": str(e)}}
|
|
|
|
def handle_struct_delete(self, exchange):
|
|
if not self.program:
|
|
return self._no_program()
|
|
body = parse_json_body(exchange)
|
|
name = body.get("name")
|
|
if not name:
|
|
return {"success": False, "error": {"code": "MISSING_PARAMETER", "message": "Missing 'name'"}}
|
|
|
|
dt = self._find_struct(name)
|
|
if not dt:
|
|
return {"success": False, "error": {"code": "STRUCT_NOT_FOUND", "message": "Struct not found: %s" % name}}
|
|
|
|
category = dt.getCategoryPath().getPath()
|
|
path = dt.getPathName()
|
|
|
|
def do_delete():
|
|
dtm = self.program.getDataTypeManager()
|
|
dtm.remove(dt, getMonitor())
|
|
|
|
try:
|
|
with_transaction(self.program, "Delete struct", do_delete)
|
|
return {"success": True, "result": {
|
|
"name": name,
|
|
"path": path,
|
|
"category": category,
|
|
"message": "Struct deleted successfully",
|
|
}}
|
|
except Exception as e:
|
|
return {"success": False, "error": {"code": "DELETE_ERROR", "message": str(e)}}
|
|
|
|
# ==================================================================
|
|
# Analysis Handlers
|
|
# ==================================================================
|
|
|
|
def handle_analysis_info(self, exchange):
|
|
if not self.program:
|
|
return self._no_program()
|
|
result = {
|
|
"program": self.program.getName(),
|
|
"processor": str(self.program.getLanguage().getProcessor()),
|
|
"addressSize": self.program.getAddressFactory().getDefaultAddressSpace().getSize(),
|
|
"programLanguage": str(self.program.getLanguage()),
|
|
"availableAnalysis": ["callgraph", "xrefs", "decompile", "dataflow"],
|
|
"_links": {
|
|
"self": make_link("/analysis"),
|
|
"callgraph": make_link("/analysis/callgraph"),
|
|
"dataflow": make_link("/analysis/dataflow"),
|
|
"program": make_link("/program"),
|
|
},
|
|
}
|
|
return {"success": True, "result": result}
|
|
|
|
def handle_analysis_run(self, exchange):
|
|
if not self.program:
|
|
return self._no_program()
|
|
try:
|
|
# In headless mode, try to re-run auto-analysis
|
|
from ghidra.app.script import GhidraScriptUtil
|
|
analyzeAll(self.program)
|
|
return {"success": True, "result": {
|
|
"program": self.program.getName(),
|
|
"analysis_triggered": True,
|
|
"message": "Analysis initiated on program",
|
|
}}
|
|
except Exception as e:
|
|
# analyzeAll may not be available; graceful fallback
|
|
return {"success": True, "result": {
|
|
"program": self.program.getName(),
|
|
"analysis_triggered": False,
|
|
"message": "Analysis request received (headless mode: %s)" % str(e),
|
|
}}
|
|
|
|
def handle_callgraph(self, exchange):
|
|
if not self.program:
|
|
return self._no_program()
|
|
params = parse_query_params(exchange)
|
|
func_name = params.get("name")
|
|
func_addr = params.get("address")
|
|
max_depth = parse_int(params.get("max_depth"), 3)
|
|
|
|
start_func = None
|
|
if func_addr:
|
|
start_func = self._find_function_at(func_addr)
|
|
elif func_name:
|
|
start_func = self._find_function_by_name(func_name)
|
|
else:
|
|
# Use first entry point
|
|
entry_iter = self.program.getSymbolTable().getExternalEntryPointIterator()
|
|
if entry_iter.hasNext():
|
|
start_func = self.program.getFunctionManager().getFunctionAt(entry_iter.next())
|
|
|
|
if not start_func:
|
|
return {"success": False, "error": {"code": "FUNCTION_NOT_FOUND", "message": "Function not found for callgraph"}}
|
|
|
|
nodes = []
|
|
edges = []
|
|
visited = set()
|
|
|
|
self._build_callgraph(start_func, nodes, edges, visited, 0, max_depth)
|
|
|
|
graph = {
|
|
"root": start_func.getName(),
|
|
"rootAddress": str(start_func.getEntryPoint()),
|
|
"maxDepth": max_depth,
|
|
"nodes": nodes,
|
|
"edges": edges,
|
|
"nodeCount": len(nodes),
|
|
"edgeCount": len(edges),
|
|
}
|
|
return {"success": True, "result": graph}
|
|
|
|
def _build_callgraph(self, func, nodes, edges, visited, depth, max_depth):
|
|
func_id = str(func.getEntryPoint())
|
|
if func_id in visited:
|
|
return
|
|
visited.add(func_id)
|
|
|
|
nodes.append({
|
|
"id": func_id,
|
|
"name": func.getName(),
|
|
"address": func_id,
|
|
"depth": depth,
|
|
})
|
|
|
|
if depth >= max_depth:
|
|
return
|
|
|
|
ref_mgr = self.program.getReferenceManager()
|
|
body = func.getBody()
|
|
addr_iter = body.getAddresses(True)
|
|
|
|
while addr_iter.hasNext():
|
|
addr = addr_iter.next()
|
|
refs = ref_mgr.getReferencesFrom(addr)
|
|
for ref in refs:
|
|
if ref.getReferenceType().isCall():
|
|
called = self.program.getFunctionManager().getFunctionAt(ref.getToAddress())
|
|
if called:
|
|
edges.append({
|
|
"from": func_id,
|
|
"to": str(called.getEntryPoint()),
|
|
"type": "call",
|
|
"callSite": str(addr),
|
|
})
|
|
self._build_callgraph(called, nodes, edges, visited, depth + 1, max_depth)
|
|
|
|
def handle_dataflow(self, exchange):
|
|
if not self.program:
|
|
return self._no_program()
|
|
params = parse_query_params(exchange)
|
|
addr_str = params.get("address")
|
|
if not addr_str:
|
|
return {"success": False, "error": {"code": "MISSING_PARAMETER", "message": "Missing 'address' parameter"}}
|
|
|
|
direction = params.get("direction", "forward")
|
|
max_steps = parse_int(params.get("max_steps"), 50)
|
|
|
|
try:
|
|
addr = self.program.getAddressFactory().getAddress(addr_str)
|
|
func = self.program.getFunctionManager().getFunctionContaining(addr)
|
|
if not func:
|
|
return {"success": False, "error": {"code": "FUNCTION_NOT_FOUND", "message": "No function containing address: %s" % addr_str}}
|
|
|
|
# Simplified dataflow: walk instructions from the starting address
|
|
listing = self.program.getListing()
|
|
steps = []
|
|
current = addr
|
|
step_count = 0
|
|
|
|
if direction == "forward":
|
|
inst_iter = listing.getInstructions(addr, True)
|
|
else:
|
|
inst_iter = listing.getInstructions(addr, False)
|
|
|
|
while inst_iter.hasNext() and step_count < max_steps:
|
|
instr = inst_iter.next()
|
|
# Stay within function body
|
|
if not func.getBody().contains(instr.getAddress()):
|
|
break
|
|
full_repr = str(instr)
|
|
steps.append({
|
|
"address": str(instr.getAddress()),
|
|
"instruction": full_repr,
|
|
"mnemonic": instr.getMnemonicString(),
|
|
"bytes": bytes_to_hex(instr.getBytes()),
|
|
})
|
|
step_count += 1
|
|
|
|
return {
|
|
"success": True,
|
|
"result": {
|
|
"start_address": addr_str,
|
|
"function": func.getName(),
|
|
"direction": direction,
|
|
"max_steps": max_steps,
|
|
"total_steps": len(steps),
|
|
"steps": steps,
|
|
"sources": [],
|
|
"sinks": [],
|
|
}
|
|
}
|
|
except Exception as e:
|
|
return {"success": False, "error": {"code": "DATAFLOW_ERROR", "message": str(e)}}
|
|
|
|
# ==================================================================
|
|
# Symbol CRUD Handlers
|
|
# ==================================================================
|
|
|
|
def handle_symbol_create(self, exchange):
|
|
"""POST /symbols - Create a new label/symbol."""
|
|
if not self.program:
|
|
return self._no_program()
|
|
body = parse_json_body(exchange)
|
|
name = body.get("name", "")
|
|
address = body.get("address", "")
|
|
if not name or not address:
|
|
return {"success": False, "error": {"code": "MISSING_PARAMETER", "message": "Both 'name' and 'address' are required"}}
|
|
|
|
try:
|
|
addr = self.program.getAddressFactory().getAddress(address)
|
|
st = self.program.getSymbolTable()
|
|
|
|
def do_create():
|
|
st.createLabel(addr, name, SourceType.USER_DEFINED)
|
|
|
|
with_transaction(self.program, "Create symbol", do_create)
|
|
return {"success": True, "result": {
|
|
"name": name,
|
|
"address": address,
|
|
"message": "Symbol created successfully",
|
|
}}
|
|
except Exception as e:
|
|
return {"success": False, "error": {"code": "SYMBOL_ERROR", "message": str(e)}}
|
|
|
|
def handle_symbol_rename(self, exchange, addr_str):
|
|
"""PATCH /symbols/{address} - Rename primary symbol at address."""
|
|
if not self.program:
|
|
return self._no_program()
|
|
body = parse_json_body(exchange)
|
|
new_name = body.get("name", "")
|
|
if not new_name:
|
|
return {"success": False, "error": {"code": "MISSING_PARAMETER", "message": "'name' parameter is required"}}
|
|
|
|
try:
|
|
addr = self.program.getAddressFactory().getAddress(addr_str)
|
|
st = self.program.getSymbolTable()
|
|
symbol = st.getPrimarySymbol(addr)
|
|
if not symbol:
|
|
return {"success": False, "error": {"code": "NOT_FOUND", "message": "No symbol at address: %s" % addr_str}}
|
|
|
|
old_name = symbol.getName()
|
|
|
|
def do_rename():
|
|
symbol.setName(new_name, SourceType.USER_DEFINED)
|
|
|
|
with_transaction(self.program, "Rename symbol", do_rename)
|
|
return {"success": True, "result": {
|
|
"address": addr_str,
|
|
"oldName": old_name,
|
|
"newName": new_name,
|
|
"message": "Symbol renamed successfully",
|
|
}}
|
|
except Exception as e:
|
|
return {"success": False, "error": {"code": "SYMBOL_ERROR", "message": str(e)}}
|
|
|
|
def handle_symbol_delete(self, exchange, addr_str):
|
|
"""DELETE /symbols/{address} - Delete primary symbol at address."""
|
|
if not self.program:
|
|
return self._no_program()
|
|
|
|
try:
|
|
addr = self.program.getAddressFactory().getAddress(addr_str)
|
|
st = self.program.getSymbolTable()
|
|
symbol = st.getPrimarySymbol(addr)
|
|
if not symbol:
|
|
return {"success": False, "error": {"code": "NOT_FOUND", "message": "No symbol at address: %s" % addr_str}}
|
|
|
|
name = symbol.getName()
|
|
|
|
def do_delete():
|
|
symbol.delete()
|
|
|
|
with_transaction(self.program, "Delete symbol", do_delete)
|
|
return {"success": True, "result": {
|
|
"address": addr_str,
|
|
"name": name,
|
|
"message": "Symbol deleted successfully",
|
|
}}
|
|
except Exception as e:
|
|
return {"success": False, "error": {"code": "SYMBOL_ERROR", "message": str(e)}}
|
|
|
|
# ==================================================================
|
|
# Variable Rename Handler
|
|
# ==================================================================
|
|
|
|
def handle_variable_rename(self, exchange, addr_str, var_name):
|
|
"""PATCH /functions/{address}/variables/{name} - Rename/retype a variable."""
|
|
if not self.program:
|
|
return self._no_program()
|
|
body = parse_json_body(exchange)
|
|
new_name = body.get("name", "")
|
|
new_type = body.get("data_type")
|
|
|
|
if not new_name:
|
|
return {"success": False, "error": {"code": "MISSING_PARAMETER", "message": "'name' parameter is required"}}
|
|
|
|
try:
|
|
# URL-decode the variable name
|
|
from java.net import URLDecoder
|
|
decoded_name = URLDecoder.decode(var_name, "UTF-8")
|
|
|
|
func = self._find_function_at(addr_str)
|
|
if not func:
|
|
return {"success": False, "error": {"code": "FUNCTION_NOT_FOUND", "message": "No function at address: %s" % addr_str}}
|
|
|
|
# Search parameters and local variables
|
|
target_var = None
|
|
for param in func.getParameters():
|
|
if param.getName() == decoded_name:
|
|
target_var = param
|
|
break
|
|
if not target_var:
|
|
for var in func.getAllVariables():
|
|
if var.getName() == decoded_name:
|
|
target_var = var
|
|
break
|
|
|
|
if not target_var:
|
|
return {"success": False, "error": {"code": "NOT_FOUND", "message": "Variable '%s' not found in function" % decoded_name}}
|
|
|
|
old_name = target_var.getName()
|
|
|
|
def do_rename():
|
|
target_var.setName(new_name, SourceType.USER_DEFINED)
|
|
if new_type:
|
|
dtm = self.program.getDataTypeManager()
|
|
dt = resolve_data_type(dtm, new_type)
|
|
if dt:
|
|
target_var.setDataType(dt, SourceType.USER_DEFINED)
|
|
|
|
with_transaction(self.program, "Rename variable", do_rename)
|
|
return {"success": True, "result": {
|
|
"function": addr_str,
|
|
"oldName": old_name,
|
|
"newName": new_name,
|
|
"message": "Variable renamed successfully",
|
|
}}
|
|
except Exception as e:
|
|
return {"success": False, "error": {"code": "VARIABLE_ERROR", "message": str(e)}}
|
|
|
|
# ==================================================================
|
|
# Variables Handler
|
|
# ==================================================================
|
|
|
|
def handle_variables(self, exchange):
|
|
"""GET /variables - List global and function variables."""
|
|
if not self.program:
|
|
return self._no_program()
|
|
params = parse_query_params(exchange)
|
|
limit = parse_int(params.get("limit"), 100)
|
|
offset = parse_int(params.get("offset"), 0)
|
|
global_only = params.get("global_only", "false").lower() == "true"
|
|
search = params.get("search", "")
|
|
grep_pattern = compile_grep(params)
|
|
|
|
variables = []
|
|
count = 0
|
|
skipped = 0
|
|
|
|
# Function variables (parameters + locals)
|
|
if not global_only:
|
|
fm = self.program.getFunctionManager()
|
|
for func in fm.getFunctions(True):
|
|
if count >= limit:
|
|
break
|
|
func_name = func.getName()
|
|
func_addr = str(func.getEntryPoint())
|
|
|
|
for param in func.getParameters():
|
|
if count >= limit:
|
|
break
|
|
var_name = param.getName()
|
|
if search and search.lower() not in var_name.lower():
|
|
continue
|
|
if skipped < offset:
|
|
skipped += 1
|
|
continue
|
|
item = {
|
|
"name": var_name,
|
|
"type": str(param.getDataType()),
|
|
"storage": str(param.getVariableStorage()),
|
|
"scope": "parameter",
|
|
"function": func_name,
|
|
"functionAddress": func_addr,
|
|
}
|
|
if not grep_matches_item(item, grep_pattern):
|
|
continue
|
|
variables.append(item)
|
|
count += 1
|
|
|
|
for var in func.getLocalVariables():
|
|
if count >= limit:
|
|
break
|
|
var_name = var.getName()
|
|
if search and search.lower() not in var_name.lower():
|
|
continue
|
|
if skipped < offset:
|
|
skipped += 1
|
|
continue
|
|
item = {
|
|
"name": var_name,
|
|
"type": str(var.getDataType()),
|
|
"storage": str(var.getVariableStorage()),
|
|
"scope": "local",
|
|
"function": func_name,
|
|
"functionAddress": func_addr,
|
|
}
|
|
if not grep_matches_item(item, grep_pattern):
|
|
continue
|
|
variables.append(item)
|
|
count += 1
|
|
|
|
# Global variables (defined data with symbol names)
|
|
listing = self.program.getListing()
|
|
st = self.program.getSymbolTable()
|
|
for data in listing.getDefinedData(True):
|
|
if count >= limit:
|
|
break
|
|
addr = data.getAddress()
|
|
symbol = st.getPrimarySymbol(addr)
|
|
if not symbol:
|
|
continue
|
|
sym_name = symbol.getName()
|
|
# Skip auto-generated names
|
|
if sym_name.startswith("DAT_") or sym_name.startswith("s_"):
|
|
continue
|
|
if search and search.lower() not in sym_name.lower():
|
|
continue
|
|
if skipped < offset:
|
|
skipped += 1
|
|
continue
|
|
item = {
|
|
"name": sym_name,
|
|
"address": str(addr),
|
|
"type": str(data.getDataType()),
|
|
"scope": "global",
|
|
"size": data.getLength(),
|
|
}
|
|
if not grep_matches_item(item, grep_pattern):
|
|
continue
|
|
variables.append(item)
|
|
count += 1
|
|
|
|
return {"success": True, "result": variables, "offset": offset, "limit": limit}
|
|
|
|
# ==================================================================
|
|
# Bookmarks Handlers
|
|
# ==================================================================
|
|
|
|
def handle_bookmarks(self, exchange):
|
|
"""GET /bookmarks - List bookmarks with optional filtering."""
|
|
if not self.program:
|
|
return self._no_program()
|
|
params = parse_query_params(exchange)
|
|
limit = parse_int(params.get("limit"), 100)
|
|
offset = parse_int(params.get("offset"), 0)
|
|
type_filter = params.get("type")
|
|
category_filter = params.get("category")
|
|
grep_pattern = compile_grep(params)
|
|
|
|
bm = self.program.getBookmarkManager()
|
|
bookmarks = []
|
|
count = 0
|
|
skipped = 0
|
|
|
|
# Get bookmark types to iterate
|
|
if type_filter:
|
|
bm_types = [type_filter]
|
|
else:
|
|
bm_types = [str(t) for t in bm.getBookmarkTypes()]
|
|
|
|
for btype in bm_types:
|
|
if count >= limit:
|
|
break
|
|
try:
|
|
it = bm.getBookmarksIterator(btype)
|
|
except:
|
|
continue
|
|
while it.hasNext() and count < limit:
|
|
bookmark = it.next()
|
|
if category_filter and bookmark.getCategory() != category_filter:
|
|
continue
|
|
if skipped < offset:
|
|
skipped += 1
|
|
continue
|
|
item = {
|
|
"address": str(bookmark.getAddress()),
|
|
"type": bookmark.getTypeString(),
|
|
"category": bookmark.getCategory(),
|
|
"comment": bookmark.getComment(),
|
|
}
|
|
if not grep_matches_item(item, grep_pattern):
|
|
continue
|
|
bookmarks.append(item)
|
|
count += 1
|
|
|
|
return {"success": True, "result": bookmarks, "offset": offset, "limit": limit}
|
|
|
|
def handle_bookmark_create(self, exchange):
|
|
"""POST /bookmarks - Create a bookmark."""
|
|
if not self.program:
|
|
return self._no_program()
|
|
body = parse_json_body(exchange)
|
|
address = body.get("address", "")
|
|
btype = body.get("type", "Note")
|
|
category = body.get("category", "")
|
|
comment = body.get("comment", "")
|
|
|
|
if not address:
|
|
return {"success": False, "error": {"code": "MISSING_PARAMETER", "message": "'address' is required"}}
|
|
|
|
try:
|
|
addr = self.program.getAddressFactory().getAddress(address)
|
|
bm = self.program.getBookmarkManager()
|
|
|
|
def do_create():
|
|
bm.setBookmark(addr, btype, category, comment)
|
|
|
|
with_transaction(self.program, "Create bookmark", do_create)
|
|
return {"success": True, "result": {
|
|
"address": address,
|
|
"type": btype,
|
|
"category": category,
|
|
"comment": comment,
|
|
"message": "Bookmark created successfully",
|
|
}}
|
|
except Exception as e:
|
|
return {"success": False, "error": {"code": "BOOKMARK_ERROR", "message": str(e)}}
|
|
|
|
def handle_bookmark_delete(self, exchange, addr_str):
|
|
"""DELETE /bookmarks/{address} - Delete all bookmarks at address."""
|
|
if not self.program:
|
|
return self._no_program()
|
|
|
|
try:
|
|
addr = self.program.getAddressFactory().getAddress(addr_str)
|
|
bm = self.program.getBookmarkManager()
|
|
removed = []
|
|
|
|
def do_delete():
|
|
for bookmark in list(bm.getBookmarks(addr)):
|
|
removed.append(bookmark.getTypeString())
|
|
bookmark.remove() if hasattr(bookmark, 'remove') else bm.removeBookmark(bookmark)
|
|
|
|
with_transaction(self.program, "Delete bookmarks", do_delete)
|
|
return {"success": True, "result": {
|
|
"address": addr_str,
|
|
"removedTypes": removed,
|
|
"count": len(removed),
|
|
"message": "Bookmarks deleted successfully",
|
|
}}
|
|
except Exception as e:
|
|
return {"success": False, "error": {"code": "BOOKMARK_ERROR", "message": str(e)}}
|
|
|
|
# ==================================================================
|
|
# Enum Handlers
|
|
# ==================================================================
|
|
|
|
def handle_enums(self, exchange):
|
|
"""GET /datatypes/enums - List enum data types."""
|
|
if not self.program:
|
|
return self._no_program()
|
|
params = parse_query_params(exchange)
|
|
limit = parse_int(params.get("limit"), 100)
|
|
offset = parse_int(params.get("offset"), 0)
|
|
grep_pattern = compile_grep(params)
|
|
|
|
from ghidra.program.model.data import Enum as GhidraEnum
|
|
|
|
dtm = self.program.getDataTypeManager()
|
|
enums = []
|
|
count = 0
|
|
skipped = 0
|
|
|
|
for dt in dtm.getAllDataTypes():
|
|
if count >= limit:
|
|
break
|
|
if not isinstance(dt, GhidraEnum):
|
|
continue
|
|
if skipped < offset:
|
|
skipped += 1
|
|
continue
|
|
|
|
# Get enum members
|
|
members = []
|
|
for name in dt.getNames():
|
|
members.append({"name": name, "value": dt.getValue(name)})
|
|
|
|
item = {
|
|
"name": dt.getName(),
|
|
"category": str(dt.getCategoryPath()),
|
|
"size": dt.getLength(),
|
|
"members": members,
|
|
}
|
|
if not grep_matches_item(item, grep_pattern):
|
|
continue
|
|
enums.append(item)
|
|
count += 1
|
|
|
|
return {"success": True, "result": enums, "offset": offset, "limit": limit}
|
|
|
|
def handle_enum_create(self, exchange):
|
|
"""POST /datatypes/enums - Create a new enum."""
|
|
if not self.program:
|
|
return self._no_program()
|
|
body = parse_json_body(exchange)
|
|
name = body.get("name", "")
|
|
size = int(body.get("size", 4))
|
|
|
|
if not name:
|
|
return {"success": False, "error": {"code": "MISSING_PARAMETER", "message": "'name' is required"}}
|
|
|
|
try:
|
|
from ghidra.program.model.data import EnumDataType, CategoryPath
|
|
|
|
dtm = self.program.getDataTypeManager()
|
|
new_enum = EnumDataType(name, size)
|
|
|
|
def do_create():
|
|
dtm.addDataType(new_enum, None)
|
|
|
|
with_transaction(self.program, "Create enum", do_create)
|
|
return {"success": True, "result": {
|
|
"name": name,
|
|
"size": size,
|
|
"message": "Enum created successfully",
|
|
}}
|
|
except Exception as e:
|
|
return {"success": False, "error": {"code": "ENUM_ERROR", "message": str(e)}}
|
|
|
|
# ==================================================================
|
|
# Typedef Handlers
|
|
# ==================================================================
|
|
|
|
def handle_typedefs(self, exchange):
|
|
"""GET /datatypes/typedefs - List typedef data types."""
|
|
if not self.program:
|
|
return self._no_program()
|
|
params = parse_query_params(exchange)
|
|
limit = parse_int(params.get("limit"), 100)
|
|
offset = parse_int(params.get("offset"), 0)
|
|
grep_pattern = compile_grep(params)
|
|
|
|
from ghidra.program.model.data import TypeDef
|
|
|
|
dtm = self.program.getDataTypeManager()
|
|
typedefs = []
|
|
count = 0
|
|
skipped = 0
|
|
|
|
for dt in dtm.getAllDataTypes():
|
|
if count >= limit:
|
|
break
|
|
if not isinstance(dt, TypeDef):
|
|
continue
|
|
if skipped < offset:
|
|
skipped += 1
|
|
continue
|
|
item = {
|
|
"name": dt.getName(),
|
|
"category": str(dt.getCategoryPath()),
|
|
"baseType": dt.getBaseDataType().getName() if dt.getBaseDataType() else None,
|
|
"size": dt.getLength(),
|
|
}
|
|
if not grep_matches_item(item, grep_pattern):
|
|
continue
|
|
typedefs.append(item)
|
|
count += 1
|
|
|
|
return {"success": True, "result": typedefs, "offset": offset, "limit": limit}
|
|
|
|
def handle_typedef_create(self, exchange):
|
|
"""POST /datatypes/typedefs - Create a new typedef."""
|
|
if not self.program:
|
|
return self._no_program()
|
|
body = parse_json_body(exchange)
|
|
name = body.get("name", "")
|
|
base_type_name = body.get("base_type", "")
|
|
|
|
if not name or not base_type_name:
|
|
return {"success": False, "error": {"code": "MISSING_PARAMETER", "message": "'name' and 'base_type' are required"}}
|
|
|
|
try:
|
|
from ghidra.program.model.data import TypedefDataType
|
|
|
|
dtm = self.program.getDataTypeManager()
|
|
|
|
# Use the shared resolver which handles builtins + path lookups
|
|
base_dt = resolve_data_type(dtm, base_type_name)
|
|
if not base_dt:
|
|
return {"success": False, "error": {"code": "NOT_FOUND", "message": "Base type not found: %s" % base_type_name}}
|
|
|
|
new_typedef = TypedefDataType(name, base_dt)
|
|
|
|
def do_create():
|
|
dtm.addDataType(new_typedef, None)
|
|
|
|
with_transaction(self.program, "Create typedef", do_create)
|
|
return {"success": True, "result": {
|
|
"name": name,
|
|
"baseType": base_type_name,
|
|
"message": "Typedef created successfully",
|
|
}}
|
|
except Exception as e:
|
|
return {"success": False, "error": {"code": "TYPEDEF_ERROR", "message": str(e)}}
|
|
|
|
# ==================================================================
|
|
# Legacy Compatibility
|
|
# ==================================================================
|
|
|
|
def handle_decompile_legacy(self, exchange):
|
|
"""Handle GET /decompile?name=X or ?address=X (backwards compat)."""
|
|
if not self.program:
|
|
return self._no_program()
|
|
params = parse_query_params(exchange)
|
|
name = params.get("name") or params.get("address")
|
|
if not name:
|
|
return {"success": False, "error": {"code": "MISSING_PARAMETER", "message": "Missing 'name' or 'address' parameter"}}
|
|
|
|
# Try by name first, then by address
|
|
func = self._find_function_by_name(name)
|
|
if not func:
|
|
func = self._find_function_at(name)
|
|
if not func:
|
|
return {"success": False, "error": {"code": "FUNCTION_NOT_FOUND", "message": "Function not found: %s" % name}}
|
|
|
|
result = self.decompiler.decompileFunction(func, 30, getMonitor())
|
|
if result and result.decompileCompleted():
|
|
code = result.getDecompiledFunction().getC()
|
|
return {
|
|
"success": True,
|
|
"name": func.getName(),
|
|
"address": str(func.getEntryPoint()),
|
|
"decompiled": code,
|
|
}
|
|
return {"success": False, "error": {"code": "DECOMPILE_FAILED", "message": "Decompilation failed"}}
|
|
|
|
|
|
# ========================================================================
|
|
# Server Startup
|
|
# ========================================================================
|
|
|
|
def run_server(port, program, decompiler):
|
|
"""Start the HTTP server with a single catch-all handler."""
|
|
server = HttpServer.create(InetSocketAddress(port), 0)
|
|
server.createContext("/", GhydraMCPHandler(program, decompiler))
|
|
server.setExecutor(Executors.newCachedThreadPool())
|
|
server.start()
|
|
println("[GhydraMCP] HTTP server started on port %d" % port)
|
|
return server
|
|
|
|
|
|
# ========================================================================
|
|
# Main Entry Point
|
|
# ========================================================================
|
|
|
|
def main():
|
|
port = DEFAULT_PORT
|
|
|
|
# Parse port from script arguments
|
|
args = getScriptArgs()
|
|
if args and len(args) > 0:
|
|
try:
|
|
port = int(args[0])
|
|
except:
|
|
println("Invalid port number, using default: %d" % DEFAULT_PORT)
|
|
|
|
# Initialize decompiler
|
|
decompiler = DecompInterface()
|
|
decompiler.openProgram(currentProgram)
|
|
|
|
println("=========================================")
|
|
println(" GhydraMCP Headless HTTP Server")
|
|
println("=========================================")
|
|
println(" API Version: %s (compat: %d)" % (API_VERSION_STRING, API_VERSION))
|
|
println(" Port: %d" % port)
|
|
println(" Program: %s" % (currentProgram.getName() if currentProgram else "None"))
|
|
println(" Script: Python/Jython (Full API)")
|
|
println(" Routes: %d" % len(ROUTES))
|
|
println("=========================================")
|
|
|
|
server = run_server(port, currentProgram, decompiler)
|
|
|
|
println("")
|
|
println("GhydraMCP Server running. Press Ctrl+C to stop.")
|
|
println("API available at: http://localhost:%d/" % port)
|
|
|
|
# Keep the script running
|
|
try:
|
|
while True:
|
|
time.sleep(1)
|
|
except KeyboardInterrupt:
|
|
server.stop(0)
|
|
println("[GhydraMCP] Server stopped.")
|
|
|
|
|
|
# Run
|
|
main()
|