- Change from 'except Exception' to bare 'except' to catch Java exceptions from Ghidra that don't inherit from Python Exception - Use sys.exc_info() to safely extract error messages when str(e) might fail on certain Java exception types - Add null checks after getAddress() since it can return None instead of throwing for invalid addresses - Add last-resort response handling to prevent silent connection drops when exception handling itself fails These endpoints now return proper JSON error responses instead of causing "Empty reply from server" errors.
2844 lines
108 KiB
Python
2844 lines
108 KiB
Python
# MCGhidraServer.py - Headless Ghidra script for MCGhidra HTTP API
|
|
# Full API parity with the Java plugin implementation.
|
|
# Python 2 / Jython compatible (no f-strings, no readAllBytes).
|
|
#
|
|
# Usage: analyzeHeadless <project> <name> -import <binary> -postScript MCGhidraServer.py [port]
|
|
#
|
|
#@category MCGhidra
|
|
#@keybinding
|
|
#@menupath
|
|
#@toolbar
|
|
|
|
# === Java imports ===
|
|
from com.sun.net.httpserver import HttpServer, HttpHandler
|
|
from java.net import InetSocketAddress, URLDecoder
|
|
from java.util.concurrent import Executors
|
|
from java.io import OutputStream, BufferedReader, InputStreamReader
|
|
|
|
# === Ghidra imports ===
|
|
from ghidra.app.decompiler import DecompInterface
|
|
from ghidra.program.model.symbol import SourceType
|
|
from ghidra.program.model.listing import CodeUnit
|
|
|
|
# === Python imports ===
|
|
import json
|
|
import re
|
|
import threading
|
|
import time
|
|
|
|
# === Constants ===
|
|
API_VERSION = 2 # Integer for MCP client compatibility (minimum expected: 2)
|
|
API_VERSION_STRING = "2.1"
|
|
DEFAULT_PORT = 8192
|
|
|
|
# === Thread-safe transaction lock ===
|
|
_tx_lock = threading.Lock()
|
|
|
|
|
|
# ========================================================================
|
|
# Utility Functions
|
|
# ========================================================================
|
|
|
|
def url_decode(s):
|
|
"""URL-decode a string using Java's URLDecoder."""
|
|
try:
|
|
return URLDecoder.decode(s, "UTF-8")
|
|
except:
|
|
return s
|
|
|
|
|
|
def parse_query_params(exchange):
|
|
"""Parse query string from an HttpExchange into a dict."""
|
|
params = {}
|
|
query = exchange.getRequestURI().getQuery()
|
|
if query:
|
|
for part in query.split("&"):
|
|
if "=" in part:
|
|
k, v = part.split("=", 1)
|
|
params[k] = url_decode(v)
|
|
else:
|
|
params[part] = ""
|
|
return params
|
|
|
|
|
|
def read_request_body(exchange):
|
|
"""Read the request body as a string (Jython-compatible, no readAllBytes)."""
|
|
reader = BufferedReader(InputStreamReader(exchange.getRequestBody(), "UTF-8"))
|
|
lines = []
|
|
line = reader.readLine()
|
|
while line is not None:
|
|
lines.append(line)
|
|
line = reader.readLine()
|
|
reader.close()
|
|
return "\n".join(lines)
|
|
|
|
|
|
def parse_json_body(exchange):
|
|
"""Read and parse JSON body from an HTTP request."""
|
|
body = read_request_body(exchange)
|
|
if not body or not body.strip():
|
|
return {}
|
|
try:
|
|
return json.loads(body)
|
|
except:
|
|
return {}
|
|
|
|
|
|
def parse_int(value, default):
|
|
"""Safely parse an integer with a default fallback."""
|
|
if value is None:
|
|
return default
|
|
try:
|
|
return int(value)
|
|
except (ValueError, TypeError):
|
|
return default
|
|
|
|
|
|
def bytes_to_hex(byte_array):
|
|
"""Convert a Java/Python byte array to a hex string."""
|
|
return ''.join(['%02x' % (b & 0xff) for b in byte_array])
|
|
|
|
|
|
def hex_to_bytes(hex_str):
|
|
"""Convert a hex string to a list of integer byte values."""
|
|
result = []
|
|
for i in range(0, len(hex_str), 2):
|
|
result.append(int(hex_str[i:i+2], 16))
|
|
return result
|
|
|
|
|
|
def make_link(href):
|
|
"""Create a HATEOAS link dict."""
|
|
return {"href": href}
|
|
|
|
|
|
def compile_grep(params):
|
|
"""Compile a grep pattern from query params if present.
|
|
|
|
Returns a compiled regex or None if no grep param.
|
|
Uses re.IGNORECASE by default.
|
|
"""
|
|
grep = params.get("grep")
|
|
if not grep:
|
|
return None
|
|
try:
|
|
return re.compile(grep, re.IGNORECASE)
|
|
except:
|
|
return None
|
|
|
|
|
|
def grep_matches_item(item, pattern):
|
|
"""Check if any string value in item matches the grep pattern.
|
|
|
|
Searches all string values in dict items, or the string
|
|
representation of non-dict items.
|
|
"""
|
|
if pattern is None:
|
|
return True
|
|
if isinstance(item, dict):
|
|
for value in item.values():
|
|
if isinstance(value, (str,)):
|
|
if pattern.search(value):
|
|
return True
|
|
elif isinstance(value, (int, float, bool)):
|
|
if pattern.search(str(value)):
|
|
return True
|
|
return False
|
|
return bool(pattern.search(str(item)))
|
|
|
|
|
|
def with_transaction(program, desc, fn):
|
|
"""Execute fn inside a thread-safe Ghidra transaction."""
|
|
_tx_lock.acquire()
|
|
try:
|
|
tx_id = program.startTransaction(desc)
|
|
try:
|
|
result = fn()
|
|
program.endTransaction(tx_id, True)
|
|
return result
|
|
except:
|
|
program.endTransaction(tx_id, False)
|
|
raise
|
|
finally:
|
|
_tx_lock.release()
|
|
|
|
|
|
# ========================================================================
|
|
# Data Type Resolution
|
|
# ========================================================================
|
|
|
|
_BUILTIN_TYPES = None
|
|
|
|
def _init_builtin_types():
|
|
"""Lazily initialize the builtin data type map."""
|
|
global _BUILTIN_TYPES
|
|
if _BUILTIN_TYPES is not None:
|
|
return _BUILTIN_TYPES
|
|
_BUILTIN_TYPES = {}
|
|
try:
|
|
from ghidra.program.model.data import (
|
|
ByteDataType, WordDataType, DWordDataType, QWordDataType,
|
|
FloatDataType, DoubleDataType, CharDataType,
|
|
ShortDataType, IntegerDataType, LongDataType, LongLongDataType,
|
|
PointerDataType, BooleanDataType,
|
|
StringDataType, UnicodeDataType,
|
|
Undefined1DataType, Undefined2DataType,
|
|
Undefined4DataType, Undefined8DataType
|
|
)
|
|
pairs = [
|
|
("byte", ByteDataType), ("word", WordDataType),
|
|
("dword", DWordDataType), ("qword", QWordDataType),
|
|
("float", FloatDataType), ("double", DoubleDataType),
|
|
("char", CharDataType), ("short", ShortDataType),
|
|
("int", IntegerDataType), ("long", LongDataType),
|
|
("longlong", LongLongDataType),
|
|
("pointer", PointerDataType), ("bool", BooleanDataType),
|
|
("boolean", BooleanDataType),
|
|
("string", StringDataType), ("unicode", UnicodeDataType),
|
|
("undefined1", Undefined1DataType), ("undefined2", Undefined2DataType),
|
|
("undefined4", Undefined4DataType), ("undefined8", Undefined8DataType),
|
|
]
|
|
for name, cls in pairs:
|
|
try:
|
|
_BUILTIN_TYPES[name] = cls.dataType
|
|
except:
|
|
pass
|
|
except ImportError:
|
|
pass
|
|
# Also try unsigned variants
|
|
try:
|
|
from ghidra.program.model.data import (
|
|
UnsignedShortDataType, UnsignedIntegerDataType,
|
|
UnsignedLongDataType, UnsignedLongLongDataType
|
|
)
|
|
for name, cls in [
|
|
("ushort", UnsignedShortDataType), ("uint", UnsignedIntegerDataType),
|
|
("ulong", UnsignedLongDataType), ("ulonglong", UnsignedLongLongDataType),
|
|
]:
|
|
try:
|
|
_BUILTIN_TYPES[name] = cls.dataType
|
|
except:
|
|
pass
|
|
except ImportError:
|
|
pass
|
|
return _BUILTIN_TYPES
|
|
|
|
|
|
def resolve_data_type(dtm, type_name):
|
|
"""Resolve a type name string to a Ghidra DataType object."""
|
|
if not type_name:
|
|
return None
|
|
# Direct path lookup
|
|
dt = dtm.getDataType("/" + type_name)
|
|
if dt is not None:
|
|
return dt
|
|
# Builtin types map
|
|
builtins = _init_builtin_types()
|
|
dt = builtins.get(type_name.lower())
|
|
if dt is not None:
|
|
return dt
|
|
# Search all data types (expensive fallback)
|
|
it = dtm.getAllDataTypes()
|
|
while it.hasNext():
|
|
candidate = it.next()
|
|
if candidate.getName().lower() == type_name.lower():
|
|
return candidate
|
|
return None
|
|
|
|
|
|
# ========================================================================
|
|
# Comment type mapping
|
|
# ========================================================================
|
|
|
|
COMMENT_TYPE_MAP = {
|
|
"pre": CodeUnit.PRE_COMMENT,
|
|
"post": CodeUnit.POST_COMMENT,
|
|
"eol": CodeUnit.EOL_COMMENT,
|
|
"plate": CodeUnit.PLATE_COMMENT,
|
|
"repeatable": CodeUnit.REPEATABLE_COMMENT,
|
|
}
|
|
|
|
|
|
# ========================================================================
|
|
# Route Table
|
|
# ========================================================================
|
|
|
|
ROUTES = [
|
|
# (HTTP method, path regex, handler method name)
|
|
|
|
# Root / Meta
|
|
("GET", r"^/$", "handle_root"),
|
|
("GET", r"^/info$", "handle_info"),
|
|
("GET", r"^/program$", "handle_program"),
|
|
|
|
# Headless stubs (cursor-dependent, not available headless)
|
|
("GET", r"^/address$", "handle_address_stub"),
|
|
("GET", r"^/function$", "handle_function_stub"),
|
|
|
|
# Functions - by-name variants (must precede address patterns)
|
|
("GET", r"^/functions/by-name/([^/]+)/decompile$", "handle_decompile_by_name"),
|
|
("GET", r"^/functions/by-name/([^/]+)/disassembly$", "handle_disassembly_by_name"),
|
|
("PUT", r"^/functions/by-name/([^/]+)/signature$", "handle_signature_by_name"),
|
|
("PATCH", r"^/functions/by-name/([^/]+)$", "handle_patch_function_by_name"),
|
|
("GET", r"^/functions/by-name/([^/]+)$", "handle_function_by_name"),
|
|
|
|
# Functions - by address
|
|
("GET", r"^/functions/([^/]+)/decompile$", "handle_decompile"),
|
|
("GET", r"^/functions/([^/]+)/disassembly$", "handle_disassembly"),
|
|
("GET", r"^/functions/([^/]+)/variables$", "handle_function_variables"),
|
|
("PUT", r"^/functions/([^/]+)/signature$", "handle_signature"),
|
|
("PATCH", r"^/functions/([^/]+)$", "handle_patch_function"),
|
|
("GET", r"^/functions/([^/]+)$", "handle_function_detail"),
|
|
|
|
# Functions list / create
|
|
("GET", r"^/functions/?$", "handle_functions_list"),
|
|
("POST", r"^/functions/?$", "handle_create_function"),
|
|
|
|
# Data
|
|
("POST", r"^/data/delete$", "handle_data_delete"),
|
|
("POST", r"^/data/type$", "handle_data_type"),
|
|
("GET", r"^/data/strings$", "handle_strings"),
|
|
("GET", r"^/data/?$", "handle_data_list"),
|
|
("POST", r"^/data/?$", "handle_data_create"),
|
|
|
|
# Strings
|
|
("GET", r"^/strings$", "handle_strings"),
|
|
|
|
# Memory
|
|
("GET", r"^/memory/([^/]+)/comments/([^/]+)$", "handle_get_comment"),
|
|
("POST", r"^/memory/([^/]+)/comments/([^/]+)$", "handle_set_comment"),
|
|
("GET", r"^/memory/blocks$", "handle_memory_blocks"),
|
|
("GET", r"^/memory$", "handle_memory_read"),
|
|
("PATCH", r"^/programs/current/memory/([^/]+)$", "handle_memory_write"),
|
|
|
|
# Segments
|
|
("GET", r"^/segments$", "handle_segments"),
|
|
|
|
# Symbols
|
|
("GET", r"^/symbols/imports$", "handle_imports"),
|
|
("GET", r"^/symbols/exports$", "handle_exports"),
|
|
("POST", r"^/symbols$", "handle_symbol_create"),
|
|
("PATCH", r"^/symbols/([^/]+)$", "handle_symbol_rename"),
|
|
("DELETE", r"^/symbols/([^/]+)$", "handle_symbol_delete"),
|
|
("GET", r"^/symbols$", "handle_symbols"),
|
|
|
|
# Variables
|
|
("PATCH", r"^/functions/([^/]+)/variables/([^/]+)$", "handle_variable_rename"),
|
|
("GET", r"^/variables$", "handle_variables"),
|
|
|
|
# Bookmarks
|
|
("POST", r"^/bookmarks$", "handle_bookmark_create"),
|
|
("DELETE", r"^/bookmarks/([^/]+)$", "handle_bookmark_delete"),
|
|
("GET", r"^/bookmarks$", "handle_bookmarks"),
|
|
|
|
# Data types
|
|
("POST", r"^/datatypes/enums$", "handle_enum_create"),
|
|
("GET", r"^/datatypes/enums$", "handle_enums"),
|
|
("POST", r"^/datatypes/typedefs$", "handle_typedef_create"),
|
|
("GET", r"^/datatypes/typedefs$", "handle_typedefs"),
|
|
|
|
# Cross-references
|
|
("GET", r"^/xrefs$", "handle_xrefs"),
|
|
|
|
# Classes & Namespaces
|
|
("GET", r"^/classes$", "handle_classes"),
|
|
("GET", r"^/namespaces$", "handle_namespaces"),
|
|
|
|
# Structs
|
|
("POST", r"^/structs/create$", "handle_struct_create"),
|
|
("POST", r"^/structs/addfield$", "handle_struct_addfield"),
|
|
("POST", r"^/structs/updatefield$", "handle_struct_updatefield"),
|
|
("POST", r"^/structs/delete$", "handle_struct_delete"),
|
|
("GET", r"^/structs$", "handle_structs"),
|
|
|
|
# Analysis
|
|
("GET", r"^/analysis/callgraph$", "handle_callgraph"),
|
|
("GET", r"^/analysis/dataflow$", "handle_dataflow"),
|
|
("GET", r"^/analysis$", "handle_analysis_info"),
|
|
("POST", r"^/analysis$", "handle_analysis_run"),
|
|
|
|
# Legacy compatibility
|
|
("GET", r"^/decompile$", "handle_decompile_legacy"),
|
|
]
|
|
|
|
|
|
# ========================================================================
|
|
# HTTP Handler
|
|
# ========================================================================
|
|
|
|
class MCGhidraHandler(HttpHandler):
|
|
|
|
def __init__(self, program, decompiler):
|
|
self.program = program
|
|
self.decompiler = decompiler
|
|
# Pre-compile route patterns
|
|
self.routes = []
|
|
for method, pattern, handler_name in ROUTES:
|
|
self.routes.append((method, re.compile(pattern), handler_name))
|
|
|
|
# ------------------------------------------------------------------
|
|
# Dispatch
|
|
# ------------------------------------------------------------------
|
|
|
|
def handle(self, exchange):
|
|
try:
|
|
method = exchange.getRequestMethod()
|
|
path = exchange.getRequestURI().getPath()
|
|
|
|
# CORS preflight
|
|
if method == "OPTIONS":
|
|
self._send_cors_preflight(exchange)
|
|
return
|
|
|
|
# Match routes
|
|
for route_method, pattern, handler_name in self.routes:
|
|
if method != route_method:
|
|
continue
|
|
match = pattern.match(path)
|
|
if match:
|
|
handler = getattr(self, handler_name)
|
|
groups = match.groups()
|
|
decoded = tuple(url_decode(g) for g in groups)
|
|
result = handler(exchange, *decoded)
|
|
if isinstance(result, tuple):
|
|
response, code = result
|
|
self._send_response(exchange, code, response)
|
|
else:
|
|
self._send_response(exchange, 200, result)
|
|
return
|
|
|
|
# No route matched
|
|
self._send_response(exchange, 404, {
|
|
"success": False,
|
|
"error": {"code": "NOT_FOUND", "message": "Endpoint not found: %s %s" % (method, path)}
|
|
})
|
|
except:
|
|
# Catch ALL exceptions including Java exceptions
|
|
import sys
|
|
exc_info = sys.exc_info()
|
|
try:
|
|
# Try to get a string representation safely
|
|
if exc_info[1] is not None:
|
|
msg = str(exc_info[1])
|
|
else:
|
|
msg = str(exc_info[0])
|
|
except:
|
|
msg = "Unknown exception"
|
|
try:
|
|
self._send_response(exchange, 500, {
|
|
"success": False,
|
|
"error": {"code": "INTERNAL_ERROR", "message": msg}
|
|
})
|
|
except:
|
|
# Last resort - at least don't crash silently
|
|
try:
|
|
exchange.sendResponseHeaders(500, 0)
|
|
exchange.getResponseBody().close()
|
|
except:
|
|
pass
|
|
|
|
def _send_response(self, exchange, code, data):
|
|
response_bytes = json.dumps(data, indent=2).encode('utf-8')
|
|
headers = exchange.getResponseHeaders()
|
|
headers.set("Content-Type", "application/json; charset=utf-8")
|
|
headers.set("Access-Control-Allow-Origin", "*")
|
|
headers.set("Access-Control-Allow-Methods", "GET, POST, PATCH, PUT, DELETE, OPTIONS")
|
|
headers.set("Access-Control-Allow-Headers", "Content-Type, X-Request-ID")
|
|
exchange.sendResponseHeaders(code, len(response_bytes))
|
|
os = exchange.getResponseBody()
|
|
os.write(response_bytes)
|
|
os.close()
|
|
|
|
def _send_cors_preflight(self, exchange):
|
|
headers = exchange.getResponseHeaders()
|
|
headers.set("Access-Control-Allow-Origin", "*")
|
|
headers.set("Access-Control-Allow-Methods", "GET, POST, PATCH, PUT, DELETE, OPTIONS")
|
|
headers.set("Access-Control-Allow-Headers", "Content-Type, X-Request-ID")
|
|
headers.set("Access-Control-Max-Age", "86400")
|
|
exchange.sendResponseHeaders(204, -1)
|
|
exchange.getResponseBody().close()
|
|
|
|
# ------------------------------------------------------------------
|
|
# Internal helpers
|
|
# ------------------------------------------------------------------
|
|
|
|
def _no_program(self):
|
|
return {"success": False, "error": {"code": "NO_PROGRAM_LOADED", "message": "No program loaded"}}
|
|
|
|
def _find_function_at(self, addr_str):
|
|
"""Find a function by address string. Falls back to containing function."""
|
|
try:
|
|
addr = self.program.getAddressFactory().getAddress(addr_str)
|
|
fm = self.program.getFunctionManager()
|
|
func = fm.getFunctionAt(addr)
|
|
if func is None:
|
|
func = fm.getFunctionContaining(addr)
|
|
return func
|
|
except:
|
|
return None
|
|
|
|
def _find_function_by_name(self, name):
|
|
"""Find a function by exact name match."""
|
|
fm = self.program.getFunctionManager()
|
|
for func in fm.getFunctions(True):
|
|
if func.getName() == name:
|
|
return func
|
|
return None
|
|
|
|
def _get_function(self, identifier, by_name=False):
|
|
"""Resolve a function by name or address."""
|
|
if by_name:
|
|
return self._find_function_by_name(identifier)
|
|
return self._find_function_at(identifier)
|
|
|
|
def _decompile_function(self, func):
|
|
"""Decompile a function and return the result dict."""
|
|
result = self.decompiler.decompileFunction(func, 60, getMonitor())
|
|
resp = {
|
|
"name": func.getName(),
|
|
"address": str(func.getEntryPoint()),
|
|
}
|
|
if result and result.decompileCompleted():
|
|
decomp = result.getDecompiledFunction()
|
|
code = decomp.getC()
|
|
resp["decompiled"] = code
|
|
resp["decompiled_text"] = code # alias for MCP client compat
|
|
resp["ccode"] = code # alias for API spec compat
|
|
resp["signature"] = decomp.getSignature()
|
|
else:
|
|
msg = result.getErrorMessage() if result else "Unknown error"
|
|
resp["error"] = "Decompilation failed: %s" % msg
|
|
addr = str(func.getEntryPoint())
|
|
resp["_links"] = {
|
|
"self": make_link("/functions/%s/decompile" % addr),
|
|
"function": make_link("/functions/%s" % addr),
|
|
"disassembly": make_link("/functions/%s/disassembly" % addr),
|
|
}
|
|
return {"success": True, "result": resp}
|
|
|
|
def _build_disassembly(self, func):
|
|
"""Get disassembly instructions for a function."""
|
|
instructions = []
|
|
listing = self.program.getListing()
|
|
body = func.getBody()
|
|
inst_iter = listing.getInstructions(body, True)
|
|
while inst_iter.hasNext():
|
|
instr = inst_iter.next()
|
|
full_repr = str(instr)
|
|
mnemonic = instr.getMnemonicString()
|
|
operands = full_repr[len(mnemonic):].strip() if len(full_repr) > len(mnemonic) else ""
|
|
instructions.append({
|
|
"address": str(instr.getAddress()),
|
|
"mnemonic": mnemonic,
|
|
"operands": operands,
|
|
"bytes": bytes_to_hex(instr.getBytes()),
|
|
})
|
|
addr = str(func.getEntryPoint())
|
|
# Build text representation for MCP client
|
|
lines = []
|
|
for inst in instructions:
|
|
lines.append("%s %s %s" % (inst["address"], inst["mnemonic"], inst["operands"]))
|
|
disassembly_text = "\n".join(lines)
|
|
return {
|
|
"success": True,
|
|
"result": {
|
|
"name": func.getName(),
|
|
"address": addr,
|
|
"instructions": instructions,
|
|
"instructionCount": len(instructions),
|
|
"disassembly_text": disassembly_text,
|
|
"_links": {
|
|
"self": make_link("/functions/%s/disassembly" % addr),
|
|
"function": make_link("/functions/%s" % addr),
|
|
"decompile": make_link("/functions/%s/decompile" % addr),
|
|
},
|
|
}
|
|
}
|
|
|
|
def _build_function_info(self, func):
|
|
"""Build a detailed function info dict."""
|
|
addr = str(func.getEntryPoint())
|
|
info = {
|
|
"name": func.getName(),
|
|
"address": addr,
|
|
"signature": str(func.getSignature()),
|
|
"returnType": func.getReturnType().getName(),
|
|
"parameterCount": func.getParameterCount(),
|
|
"callingConvention": func.getCallingConventionName(),
|
|
"isThunk": func.isThunk(),
|
|
"isExternal": func.isExternal(),
|
|
"bodySize": func.getBody().getNumAddresses(),
|
|
}
|
|
# Parameters
|
|
params = []
|
|
for param in func.getParameters():
|
|
params.append({
|
|
"name": param.getName(),
|
|
"type": param.getDataType().getName(),
|
|
"ordinal": param.getOrdinal(),
|
|
})
|
|
info["parameters"] = params
|
|
# Comments
|
|
entry = func.getEntryPoint()
|
|
listing = self.program.getListing()
|
|
cu = listing.getCodeUnitAt(entry)
|
|
if cu:
|
|
for ctype, cname in [
|
|
(CodeUnit.PRE_COMMENT, "preComment"),
|
|
(CodeUnit.POST_COMMENT, "postComment"),
|
|
(CodeUnit.EOL_COMMENT, "eolComment"),
|
|
(CodeUnit.PLATE_COMMENT, "plateComment"),
|
|
]:
|
|
c = cu.getComment(ctype)
|
|
if c:
|
|
info[cname] = c
|
|
func_comment = func.getComment()
|
|
if func_comment:
|
|
info["comment"] = func_comment
|
|
# HATEOAS links
|
|
info["_links"] = {
|
|
"self": make_link("/functions/%s" % addr),
|
|
"decompile": make_link("/functions/%s/decompile" % addr),
|
|
"disassembly": make_link("/functions/%s/disassembly" % addr),
|
|
"variables": make_link("/functions/%s/variables" % addr),
|
|
"xrefs_to": make_link("/xrefs?to_addr=%s" % addr),
|
|
"xrefs_from": make_link("/xrefs?from_addr=%s" % addr),
|
|
}
|
|
return info
|
|
|
|
def _build_xref_info(self, ref):
|
|
"""Build a cross-reference info dict."""
|
|
info = {
|
|
"fromAddress": str(ref.getFromAddress()),
|
|
"toAddress": str(ref.getToAddress()),
|
|
"type": self._get_ref_type_name(ref.getReferenceType()),
|
|
"isPrimary": ref.isPrimary(),
|
|
}
|
|
fm = self.program.getFunctionManager()
|
|
from_func = fm.getFunctionContaining(ref.getFromAddress())
|
|
to_func = fm.getFunctionContaining(ref.getToAddress())
|
|
if from_func:
|
|
info["fromFunction"] = from_func.getName()
|
|
if to_func:
|
|
info["toFunction"] = to_func.getName()
|
|
return info
|
|
|
|
def _get_ref_type_name(self, ref_type):
|
|
if ref_type.isCall():
|
|
return "CALL"
|
|
if ref_type.isData():
|
|
return "DATA"
|
|
if ref_type.isRead():
|
|
return "READ"
|
|
if ref_type.isWrite():
|
|
return "WRITE"
|
|
if ref_type.isJump():
|
|
return "JUMP"
|
|
return str(ref_type)
|
|
|
|
def _find_struct(self, name):
|
|
"""Find a struct/union data type by name."""
|
|
dtm = self.program.getDataTypeManager()
|
|
it = dtm.getAllDataTypes()
|
|
while it.hasNext():
|
|
dt = it.next()
|
|
if dt.getName() == name:
|
|
from ghidra.program.model.data import Structure, Union
|
|
if isinstance(dt, (Structure, Union)):
|
|
return dt
|
|
return None
|
|
|
|
# ==================================================================
|
|
# Root / Meta Handlers
|
|
# ==================================================================
|
|
|
|
def handle_root(self, exchange):
|
|
result = {
|
|
"success": True,
|
|
"api_version": API_VERSION,
|
|
"api_version_string": API_VERSION_STRING,
|
|
"message": "MCGhidra Headless API",
|
|
"mode": "headless",
|
|
}
|
|
if self.program:
|
|
result["program"] = self.program.getName()
|
|
result["file"] = self.program.getExecutablePath()
|
|
result["language"] = str(self.program.getLanguageID())
|
|
result["_links"] = {
|
|
"self": make_link("/"),
|
|
"info": make_link("/info"),
|
|
"program": make_link("/program"),
|
|
"functions": make_link("/functions"),
|
|
"data": make_link("/data"),
|
|
"strings": make_link("/strings"),
|
|
"memory": make_link("/memory"),
|
|
"segments": make_link("/segments"),
|
|
"symbols": make_link("/symbols"),
|
|
"xrefs": make_link("/xrefs"),
|
|
"classes": make_link("/classes"),
|
|
"namespaces": make_link("/namespaces"),
|
|
"structs": make_link("/structs"),
|
|
"analysis": make_link("/analysis"),
|
|
}
|
|
return result
|
|
|
|
def handle_info(self, exchange):
|
|
if not self.program:
|
|
return self._no_program()
|
|
mem = self.program.getMemory()
|
|
result = {
|
|
"success": True,
|
|
"result": {
|
|
"name": self.program.getName(),
|
|
"path": self.program.getExecutablePath(),
|
|
"language": str(self.program.getLanguageID()),
|
|
"processor": str(self.program.getLanguage().getProcessor()),
|
|
"addressSize": self.program.getAddressFactory().getDefaultAddressSpace().getSize(),
|
|
"imageBase": str(self.program.getImageBase()),
|
|
"minAddress": str(mem.getMinAddress()),
|
|
"maxAddress": str(mem.getMaxAddress()),
|
|
"memorySize": mem.getSize(),
|
|
"functionCount": self.program.getFunctionManager().getFunctionCount(),
|
|
"mode": "headless",
|
|
"_links": {
|
|
"self": make_link("/info"),
|
|
"program": make_link("/program"),
|
|
"functions": make_link("/functions"),
|
|
},
|
|
}
|
|
}
|
|
return result
|
|
|
|
def handle_program(self, exchange):
|
|
if not self.program:
|
|
return self._no_program()
|
|
mem = self.program.getMemory()
|
|
fm = self.program.getFunctionManager()
|
|
result = {
|
|
"name": self.program.getName(),
|
|
"path": self.program.getExecutablePath(),
|
|
"language": str(self.program.getLanguageID()),
|
|
"compiler": str(self.program.getCompilerSpec().getCompilerSpecID()),
|
|
"processor": str(self.program.getLanguage().getProcessor()),
|
|
"addressSize": self.program.getAddressFactory().getDefaultAddressSpace().getSize(),
|
|
"imageBase": str(self.program.getImageBase()),
|
|
"minAddress": str(mem.getMinAddress()),
|
|
"maxAddress": str(mem.getMaxAddress()),
|
|
"memorySize": mem.getSize(),
|
|
"functionCount": fm.getFunctionCount(),
|
|
"_links": {
|
|
"self": make_link("/program"),
|
|
"functions": make_link("/functions"),
|
|
"symbols": make_link("/symbols"),
|
|
"segments": make_link("/segments"),
|
|
"analysis": make_link("/analysis"),
|
|
},
|
|
}
|
|
return {"success": True, "result": result}
|
|
|
|
# ==================================================================
|
|
# Headless Stubs
|
|
# ==================================================================
|
|
|
|
def handle_address_stub(self, exchange):
|
|
return {
|
|
"success": False,
|
|
"error": {
|
|
"code": "HEADLESS_MODE",
|
|
"message": "Current address is not available in headless mode. Use /functions or /data with address parameters instead."
|
|
}
|
|
}
|
|
|
|
def handle_function_stub(self, exchange):
|
|
return {
|
|
"success": False,
|
|
"error": {
|
|
"code": "HEADLESS_MODE",
|
|
"message": "Current function is not available in headless mode. Use /functions/{address} instead."
|
|
}
|
|
}
|
|
|
|
# ==================================================================
|
|
# Function Handlers
|
|
# ==================================================================
|
|
|
|
def handle_functions_list(self, exchange):
|
|
if not self.program:
|
|
return self._no_program()
|
|
params = parse_query_params(exchange)
|
|
limit = parse_int(params.get("limit"), 100)
|
|
offset = parse_int(params.get("offset"), 0)
|
|
name_filter = params.get("name")
|
|
name_contains = params.get("name_contains")
|
|
name_regex = params.get("name_matches_regex")
|
|
addr_filter = params.get("addr")
|
|
grep_pattern = compile_grep(params)
|
|
|
|
# Compile name regex if provided
|
|
name_regex_pat = None
|
|
if name_regex:
|
|
try:
|
|
name_regex_pat = re.compile(name_regex, re.IGNORECASE)
|
|
except:
|
|
return {"success": False, "error": {"code": "INVALID_REGEX", "message": "Invalid regex: %s" % name_regex}}
|
|
|
|
fm = self.program.getFunctionManager()
|
|
total = fm.getFunctionCount()
|
|
|
|
# Single function lookup by address
|
|
if addr_filter:
|
|
func = self._find_function_at(addr_filter)
|
|
if not func:
|
|
return {"success": True, "result": [], "size": 0, "offset": 0, "limit": limit}
|
|
addr = str(func.getEntryPoint())
|
|
return {"success": True, "result": [{
|
|
"name": func.getName(),
|
|
"address": addr,
|
|
"signature": str(func.getSignature()),
|
|
"parameterCount": func.getParameterCount(),
|
|
"isThunk": func.isThunk(),
|
|
"_links": {
|
|
"self": make_link("/functions/%s" % addr),
|
|
"decompile": make_link("/functions/%s/decompile" % addr),
|
|
"disassembly": make_link("/functions/%s/disassembly" % addr),
|
|
},
|
|
}], "size": 1, "offset": 0, "limit": limit}
|
|
|
|
functions = []
|
|
count = 0
|
|
skipped = 0
|
|
|
|
for func in fm.getFunctions(True):
|
|
if count >= limit:
|
|
break
|
|
func_name = func.getName()
|
|
# Apply name filters
|
|
if name_filter and name_filter.lower() not in func_name.lower():
|
|
continue
|
|
if name_contains and name_contains.lower() not in func_name.lower():
|
|
continue
|
|
if name_regex_pat and not name_regex_pat.search(func_name):
|
|
continue
|
|
if skipped < offset:
|
|
skipped += 1
|
|
continue
|
|
addr = str(func.getEntryPoint())
|
|
item = {
|
|
"name": func_name,
|
|
"address": addr,
|
|
"signature": str(func.getSignature()),
|
|
"parameterCount": func.getParameterCount(),
|
|
"isThunk": func.isThunk(),
|
|
"_links": {
|
|
"self": make_link("/functions/%s" % addr),
|
|
"decompile": make_link("/functions/%s/decompile" % addr),
|
|
"disassembly": make_link("/functions/%s/disassembly" % addr),
|
|
},
|
|
}
|
|
if not grep_matches_item(item, grep_pattern):
|
|
continue
|
|
functions.append(item)
|
|
count += 1
|
|
|
|
result = {
|
|
"success": True,
|
|
"result": functions,
|
|
"size": total,
|
|
"offset": offset,
|
|
"limit": limit,
|
|
"_links": {
|
|
"self": make_link("/functions?offset=%d&limit=%d" % (offset, limit)),
|
|
},
|
|
}
|
|
if offset + count < total:
|
|
result["_links"]["next"] = make_link("/functions?offset=%d&limit=%d" % (offset + limit, limit))
|
|
if offset > 0:
|
|
result["_links"]["prev"] = make_link("/functions?offset=%d&limit=%d" % (max(0, offset - limit), limit))
|
|
return result
|
|
|
|
def handle_function_detail(self, exchange, addr_str):
|
|
if not self.program:
|
|
return self._no_program()
|
|
func = self._find_function_at(addr_str)
|
|
if not func:
|
|
return {"success": False, "error": {"code": "FUNCTION_NOT_FOUND", "message": "No function at address: %s" % addr_str}}
|
|
return {"success": True, "result": self._build_function_info(func)}
|
|
|
|
def handle_function_by_name(self, exchange, name):
|
|
if not self.program:
|
|
return self._no_program()
|
|
func = self._find_function_by_name(name)
|
|
if not func:
|
|
return {"success": False, "error": {"code": "FUNCTION_NOT_FOUND", "message": "Function not found: %s" % name}}
|
|
return {"success": True, "result": self._build_function_info(func)}
|
|
|
|
# -- Decompile --
|
|
|
|
def handle_decompile(self, exchange, addr_str):
|
|
if not self.program:
|
|
return self._no_program()
|
|
func = self._find_function_at(addr_str)
|
|
if not func:
|
|
return {"success": False, "error": {"code": "FUNCTION_NOT_FOUND", "message": "No function at address: %s" % addr_str}}
|
|
return self._decompile_function(func)
|
|
|
|
def handle_decompile_by_name(self, exchange, name):
|
|
if not self.program:
|
|
return self._no_program()
|
|
func = self._find_function_by_name(name)
|
|
if not func:
|
|
return {"success": False, "error": {"code": "FUNCTION_NOT_FOUND", "message": "Function not found: %s" % name}}
|
|
return self._decompile_function(func)
|
|
|
|
# -- Disassembly --
|
|
|
|
def handle_disassembly(self, exchange, addr_str):
|
|
if not self.program:
|
|
return self._no_program()
|
|
func = self._find_function_at(addr_str)
|
|
if not func:
|
|
return {"success": False, "error": {"code": "FUNCTION_NOT_FOUND", "message": "No function at address: %s" % addr_str}}
|
|
return self._build_disassembly(func)
|
|
|
|
def handle_disassembly_by_name(self, exchange, name):
|
|
if not self.program:
|
|
return self._no_program()
|
|
func = self._find_function_by_name(name)
|
|
if not func:
|
|
return {"success": False, "error": {"code": "FUNCTION_NOT_FOUND", "message": "Function not found: %s" % name}}
|
|
return self._build_disassembly(func)
|
|
|
|
# -- Variables --
|
|
|
|
def handle_function_variables(self, exchange, addr_str):
|
|
if not self.program:
|
|
return self._no_program()
|
|
func = self._find_function_at(addr_str)
|
|
if not func:
|
|
return {"success": False, "error": {"code": "FUNCTION_NOT_FOUND", "message": "No function at address: %s" % addr_str}}
|
|
|
|
variables = []
|
|
# Parameters
|
|
for param in func.getParameters():
|
|
variables.append({
|
|
"name": param.getName(),
|
|
"type": param.getDataType().getName(),
|
|
"storage": str(param.getVariableStorage()),
|
|
"kind": "parameter",
|
|
"ordinal": param.getOrdinal(),
|
|
})
|
|
# Local variables
|
|
for local in func.getLocalVariables():
|
|
var_info = {
|
|
"name": local.getName(),
|
|
"type": local.getDataType().getName(),
|
|
"storage": str(local.getVariableStorage()),
|
|
"kind": "local",
|
|
}
|
|
try:
|
|
so = local.getStackOffset()
|
|
if so != 0:
|
|
var_info["stackOffset"] = so
|
|
except:
|
|
pass
|
|
variables.append(var_info)
|
|
|
|
addr = str(func.getEntryPoint())
|
|
return {
|
|
"success": True,
|
|
"result": {
|
|
"name": func.getName(),
|
|
"address": addr,
|
|
"variables": variables,
|
|
"parameterCount": func.getParameterCount(),
|
|
"localVariableCount": len(func.getLocalVariables()),
|
|
}
|
|
}
|
|
|
|
# -- Patch (rename / comment) --
|
|
|
|
def _handle_patch_function_impl(self, exchange, identifier, by_name):
|
|
if not self.program:
|
|
return self._no_program()
|
|
func = self._get_function(identifier, by_name)
|
|
if not func:
|
|
return {"success": False, "error": {"code": "FUNCTION_NOT_FOUND", "message": "Function not found: %s" % identifier}}
|
|
|
|
body = parse_json_body(exchange)
|
|
new_name = body.get("name")
|
|
comment = body.get("comment")
|
|
|
|
if not new_name and comment is None:
|
|
return {"success": False, "error": {"code": "MISSING_PARAMETER", "message": "Provide 'name' and/or 'comment' in request body"}}
|
|
|
|
old_name = func.getName()
|
|
addr = str(func.getEntryPoint())
|
|
result = {"address": addr, "oldName": old_name}
|
|
|
|
def do_patch():
|
|
if new_name:
|
|
func.setName(new_name, SourceType.USER_DEFINED)
|
|
result["newName"] = new_name
|
|
if comment is not None:
|
|
func.setComment(comment)
|
|
result["comment"] = comment
|
|
|
|
with_transaction(self.program, "Patch function", do_patch)
|
|
result["message"] = "Function updated successfully"
|
|
return {"success": True, "result": result}
|
|
|
|
def handle_patch_function(self, exchange, addr_str):
|
|
return self._handle_patch_function_impl(exchange, addr_str, by_name=False)
|
|
|
|
def handle_patch_function_by_name(self, exchange, name):
|
|
return self._handle_patch_function_impl(exchange, name, by_name=True)
|
|
|
|
# -- Create function --
|
|
|
|
def handle_create_function(self, exchange):
|
|
if not self.program:
|
|
return self._no_program()
|
|
body = parse_json_body(exchange)
|
|
addr_str = body.get("address")
|
|
if not addr_str:
|
|
return {"success": False, "error": {"code": "MISSING_PARAMETER", "message": "Missing 'address' in request body"}}
|
|
|
|
try:
|
|
addr = self.program.getAddressFactory().getAddress(addr_str)
|
|
except:
|
|
return {"success": False, "error": {"code": "INVALID_ADDRESS", "message": "Invalid address: %s" % addr_str}}
|
|
|
|
if addr is None:
|
|
return {"success": False, "error": {"code": "INVALID_ADDRESS", "message": "Could not parse address: %s" % addr_str}}
|
|
|
|
result_holder = [None]
|
|
|
|
def do_create():
|
|
fm = self.program.getFunctionManager()
|
|
func = fm.createFunction(None, addr, None, SourceType.USER_DEFINED)
|
|
result_holder[0] = func
|
|
|
|
try:
|
|
with_transaction(self.program, "Create function", do_create)
|
|
func = result_holder[0]
|
|
if func:
|
|
return ({"success": True, "result": {
|
|
"name": func.getName(),
|
|
"address": str(func.getEntryPoint()),
|
|
"message": "Function created successfully",
|
|
}}, 201)
|
|
return {"success": False, "error": {"code": "CREATE_FAILED", "message": "Failed to create function at %s" % addr_str}}
|
|
except:
|
|
import sys
|
|
exc = sys.exc_info()[1]
|
|
try:
|
|
msg = str(exc)
|
|
except:
|
|
msg = "Failed to create function"
|
|
return {"success": False, "error": {"code": "CREATE_ERROR", "message": msg}}
|
|
|
|
# -- Signature --
|
|
|
|
def _handle_signature_impl(self, exchange, identifier, by_name):
|
|
if not self.program:
|
|
return self._no_program()
|
|
func = self._get_function(identifier, by_name)
|
|
if not func:
|
|
return {"success": False, "error": {"code": "FUNCTION_NOT_FOUND", "message": "Function not found: %s" % identifier}}
|
|
|
|
body = parse_json_body(exchange)
|
|
sig_str = body.get("signature")
|
|
if not sig_str:
|
|
return {"success": False, "error": {"code": "MISSING_PARAMETER", "message": "Missing 'signature' in request body"}}
|
|
|
|
addr = str(func.getEntryPoint())
|
|
try:
|
|
from ghidra.app.util.cparser.C import CParser
|
|
dtm = self.program.getDataTypeManager()
|
|
parser = CParser(dtm)
|
|
parsed = parser.parse(sig_str + ";")
|
|
|
|
def do_set_sig():
|
|
from ghidra.program.model.data import FunctionDefinitionDataType
|
|
if isinstance(parsed, FunctionDefinitionDataType):
|
|
from ghidra.app.cmd.function import ApplyFunctionSignatureCmd
|
|
cmd = ApplyFunctionSignatureCmd(func.getEntryPoint(), parsed, SourceType.USER_DEFINED)
|
|
cmd.applyTo(self.program, getMonitor())
|
|
|
|
with_transaction(self.program, "Set function signature", do_set_sig)
|
|
return {"success": True, "result": {
|
|
"address": addr,
|
|
"signature": sig_str,
|
|
"message": "Signature updated",
|
|
}}
|
|
except Exception as e:
|
|
# Fallback: try just setting name from signature
|
|
return {"success": False, "error": {"code": "SIGNATURE_ERROR", "message": "Could not parse signature: %s" % str(e)}}
|
|
|
|
def handle_signature(self, exchange, addr_str):
|
|
return self._handle_signature_impl(exchange, addr_str, by_name=False)
|
|
|
|
def handle_signature_by_name(self, exchange, name):
|
|
return self._handle_signature_impl(exchange, name, by_name=True)
|
|
|
|
# ==================================================================
|
|
# Data Handlers
|
|
# ==================================================================
|
|
|
|
def handle_data_list(self, exchange):
|
|
if not self.program:
|
|
return self._no_program()
|
|
params = parse_query_params(exchange)
|
|
limit = parse_int(params.get("limit"), 100)
|
|
offset = parse_int(params.get("offset"), 0)
|
|
addr_filter = params.get("addr")
|
|
name_filter = params.get("name")
|
|
name_contains = params.get("name_contains")
|
|
type_filter = params.get("type")
|
|
grep_pattern = compile_grep(params)
|
|
|
|
# Single address lookup
|
|
if addr_filter:
|
|
try:
|
|
addr = self.program.getAddressFactory().getAddress(addr_filter)
|
|
data = self.program.getListing().getDataAt(addr)
|
|
if data:
|
|
value = data.getDefaultValueRepresentation()
|
|
if value and len(value) > 200:
|
|
value = value[:200] + "..."
|
|
item = {
|
|
"address": str(data.getAddress()),
|
|
"type": data.getDataType().getName(),
|
|
"length": data.getLength(),
|
|
"value": value,
|
|
}
|
|
sym = self.program.getSymbolTable().getPrimarySymbol(addr)
|
|
if sym:
|
|
item["name"] = sym.getName()
|
|
return {"success": True, "result": [item], "size": 1, "offset": 0, "limit": 1}
|
|
return {"success": True, "result": [], "size": 0, "offset": 0, "limit": limit}
|
|
except:
|
|
return {"success": True, "result": [], "size": 0, "offset": 0, "limit": limit}
|
|
|
|
data_items = []
|
|
listing = self.program.getListing()
|
|
st = self.program.getSymbolTable()
|
|
count = 0
|
|
skipped = 0
|
|
|
|
for data in listing.getDefinedData(True):
|
|
if count >= limit:
|
|
break
|
|
# Type filter
|
|
if type_filter and type_filter.lower() not in data.getDataType().getName().lower():
|
|
continue
|
|
# Name filters (require symbol lookup)
|
|
if name_filter or name_contains:
|
|
sym = st.getPrimarySymbol(data.getAddress())
|
|
sym_name = sym.getName() if sym else ""
|
|
if name_filter and sym_name != name_filter:
|
|
continue
|
|
if name_contains and name_contains.lower() not in sym_name.lower():
|
|
continue
|
|
if skipped < offset:
|
|
skipped += 1
|
|
continue
|
|
value = data.getDefaultValueRepresentation()
|
|
if value and len(value) > 200:
|
|
value = value[:200] + "..."
|
|
item = {
|
|
"address": str(data.getAddress()),
|
|
"type": data.getDataType().getName(),
|
|
"length": data.getLength(),
|
|
"value": value,
|
|
}
|
|
sym = st.getPrimarySymbol(data.getAddress())
|
|
if sym:
|
|
item["name"] = sym.getName()
|
|
item["_links"] = {"self": make_link("/data/%s" % str(data.getAddress()))}
|
|
if not grep_matches_item(item, grep_pattern):
|
|
continue
|
|
data_items.append(item)
|
|
count += 1
|
|
|
|
return {
|
|
"success": True,
|
|
"result": data_items,
|
|
"offset": offset,
|
|
"limit": limit,
|
|
}
|
|
|
|
def handle_data_create(self, exchange):
|
|
if not self.program:
|
|
return self._no_program()
|
|
body = parse_json_body(exchange)
|
|
addr_str = body.get("address")
|
|
if not addr_str:
|
|
return {"success": False, "error": {"code": "MISSING_PARAMETER", "message": "Missing 'address'"}}
|
|
|
|
try:
|
|
addr = self.program.getAddressFactory().getAddress(addr_str)
|
|
except:
|
|
return {"success": False, "error": {"code": "INVALID_ADDRESS", "message": "Invalid address: %s" % addr_str}}
|
|
|
|
if addr is None:
|
|
return {"success": False, "error": {"code": "INVALID_ADDRESS", "message": "Could not parse address: %s" % addr_str}}
|
|
|
|
# Label creation (newName field)
|
|
new_name = body.get("newName")
|
|
if new_name:
|
|
def do_label():
|
|
self.program.getSymbolTable().createLabel(addr, new_name, SourceType.USER_DEFINED)
|
|
try:
|
|
with_transaction(self.program, "Create label", do_label)
|
|
return {"success": True, "result": {"address": addr_str, "name": new_name, "message": "Label created"}}
|
|
except:
|
|
import sys
|
|
exc = sys.exc_info()[1]
|
|
try:
|
|
msg = str(exc)
|
|
except:
|
|
msg = "Failed to create label"
|
|
return {"success": False, "error": {"code": "LABEL_ERROR", "message": msg}}
|
|
|
|
# Data creation (type field)
|
|
type_name = body.get("type")
|
|
if not type_name:
|
|
return {"success": False, "error": {"code": "MISSING_PARAMETER", "message": "Missing 'type' or 'newName'"}}
|
|
|
|
dtm = self.program.getDataTypeManager()
|
|
dt = resolve_data_type(dtm, type_name)
|
|
if not dt:
|
|
return {"success": False, "error": {"code": "UNKNOWN_TYPE", "message": "Cannot resolve type: %s" % type_name}}
|
|
|
|
def do_create_data():
|
|
from ghidra.program.model.data import DataUtilities
|
|
DataUtilities.createData(
|
|
self.program, addr, dt, -1, False,
|
|
DataUtilities.ClearDataMode.CLEAR_ALL_UNDEFINED_CONFLICT_DATA
|
|
)
|
|
|
|
try:
|
|
with_transaction(self.program, "Create data", do_create_data)
|
|
return ({"success": True, "result": {"address": addr_str, "type": type_name, "message": "Data created"}}, 201)
|
|
except:
|
|
import sys
|
|
exc = sys.exc_info()[1]
|
|
try:
|
|
msg = str(exc)
|
|
except:
|
|
msg = "Failed to create data"
|
|
return {"success": False, "error": {"code": "DATA_ERROR", "message": msg}}
|
|
|
|
def handle_data_delete(self, exchange):
|
|
if not self.program:
|
|
return self._no_program()
|
|
body = parse_json_body(exchange)
|
|
addr_str = body.get("address")
|
|
if not addr_str:
|
|
return {"success": False, "error": {"code": "MISSING_PARAMETER", "message": "Missing 'address'"}}
|
|
|
|
try:
|
|
addr = self.program.getAddressFactory().getAddress(addr_str)
|
|
except:
|
|
return {"success": False, "error": {"code": "INVALID_ADDRESS", "message": "Invalid address: %s" % addr_str}}
|
|
|
|
def do_delete():
|
|
self.program.getListing().clearCodeUnits(addr, addr, False)
|
|
|
|
try:
|
|
with_transaction(self.program, "Delete data", do_delete)
|
|
return {"success": True, "result": {"address": addr_str, "message": "Data deleted"}}
|
|
except Exception as e:
|
|
return {"success": False, "error": {"code": "DELETE_ERROR", "message": str(e)}}
|
|
|
|
def handle_data_type(self, exchange):
|
|
"""Change data type at an address (clear + recreate)."""
|
|
if not self.program:
|
|
return self._no_program()
|
|
body = parse_json_body(exchange)
|
|
addr_str = body.get("address")
|
|
type_name = body.get("type")
|
|
if not addr_str or not type_name:
|
|
return {"success": False, "error": {"code": "MISSING_PARAMETER", "message": "Missing 'address' and/or 'type'"}}
|
|
|
|
try:
|
|
addr = self.program.getAddressFactory().getAddress(addr_str)
|
|
except:
|
|
return {"success": False, "error": {"code": "INVALID_ADDRESS", "message": "Invalid address: %s" % addr_str}}
|
|
|
|
dtm = self.program.getDataTypeManager()
|
|
dt = resolve_data_type(dtm, type_name)
|
|
if not dt:
|
|
return {"success": False, "error": {"code": "UNKNOWN_TYPE", "message": "Cannot resolve type: %s" % type_name}}
|
|
|
|
def do_retype():
|
|
from ghidra.program.model.data import DataUtilities
|
|
self.program.getListing().clearCodeUnits(addr, addr, False)
|
|
DataUtilities.createData(
|
|
self.program, addr, dt, -1, False,
|
|
DataUtilities.ClearDataMode.CLEAR_ALL_UNDEFINED_CONFLICT_DATA
|
|
)
|
|
|
|
try:
|
|
with_transaction(self.program, "Change data type", do_retype)
|
|
return {"success": True, "result": {"address": addr_str, "type": type_name, "message": "Data type changed"}}
|
|
except Exception as e:
|
|
return {"success": False, "error": {"code": "TYPE_ERROR", "message": str(e)}}
|
|
|
|
# ==================================================================
|
|
# Strings Handler
|
|
# ==================================================================
|
|
|
|
def handle_strings(self, exchange):
|
|
if not self.program:
|
|
return self._no_program()
|
|
params = parse_query_params(exchange)
|
|
limit = parse_int(params.get("limit"), 2000)
|
|
offset = parse_int(params.get("offset"), 0)
|
|
filter_str = params.get("filter")
|
|
min_length = parse_int(params.get("min_length"), 2)
|
|
grep_pattern = compile_grep(params)
|
|
|
|
strings = []
|
|
listing = self.program.getListing()
|
|
ref_mgr = self.program.getReferenceManager()
|
|
count = 0
|
|
skipped = 0
|
|
|
|
for data in listing.getDefinedData(True):
|
|
if count >= limit:
|
|
break
|
|
try:
|
|
dt = data.getDataType()
|
|
if not dt:
|
|
continue
|
|
type_name = dt.getName().lower()
|
|
if "string" not in type_name and type_name not in ("char", "wchar"):
|
|
continue
|
|
value = data.getValue()
|
|
if not value:
|
|
continue
|
|
str_val = str(value)
|
|
if len(str_val) < min_length:
|
|
continue
|
|
if filter_str and filter_str.lower() not in str_val.lower():
|
|
continue
|
|
if skipped < offset:
|
|
skipped += 1
|
|
continue
|
|
item = {
|
|
"address": str(data.getAddress()),
|
|
"value": str_val[:200],
|
|
"length": len(str_val),
|
|
"type": dt.getName(),
|
|
}
|
|
# Xref count
|
|
ref_count = 0
|
|
refs = ref_mgr.getReferencesTo(data.getAddress())
|
|
while refs.hasNext():
|
|
refs.next()
|
|
ref_count += 1
|
|
item["xrefCount"] = ref_count
|
|
# Label name
|
|
sym = self.program.getSymbolTable().getPrimarySymbol(data.getAddress())
|
|
if sym:
|
|
item["name"] = sym.getName()
|
|
if not grep_matches_item(item, grep_pattern):
|
|
continue
|
|
strings.append(item)
|
|
count += 1
|
|
except:
|
|
pass
|
|
|
|
return {
|
|
"success": True,
|
|
"result": strings,
|
|
"size": len(strings),
|
|
"offset": offset,
|
|
"limit": limit,
|
|
}
|
|
|
|
# ==================================================================
|
|
# Memory Handlers
|
|
# ==================================================================
|
|
|
|
def handle_memory_read(self, exchange):
|
|
if not self.program:
|
|
return self._no_program()
|
|
params = parse_query_params(exchange)
|
|
addr_str = params.get("address")
|
|
if not addr_str:
|
|
return {"success": False, "error": {"code": "MISSING_PARAMETER", "message": "Missing 'address' parameter"}}
|
|
|
|
length = parse_int(params.get("length"), 16)
|
|
fmt = params.get("format", "hex")
|
|
if length <= 0 or length > 4096:
|
|
return {"success": False, "error": {"code": "INVALID_PARAMETER", "message": "Length must be 1-4096"}}
|
|
|
|
try:
|
|
addr = self.program.getAddressFactory().getAddress(addr_str)
|
|
mem = self.program.getMemory()
|
|
from jarray import zeros
|
|
byte_array = zeros(length, 'b')
|
|
bytes_read = mem.getBytes(addr, byte_array)
|
|
|
|
if fmt == "base64":
|
|
from java.util import Base64
|
|
# Convert to unsigned bytes for Base64
|
|
import array as pyarray
|
|
unsigned = bytearray([(b & 0xff) for b in byte_array[:bytes_read]])
|
|
formatted = Base64.getEncoder().encodeToString(unsigned)
|
|
elif fmt == "string":
|
|
chars = []
|
|
for i in range(bytes_read):
|
|
b = byte_array[i] & 0xff
|
|
if 32 <= b < 127:
|
|
chars.append(chr(b))
|
|
else:
|
|
chars.append('.')
|
|
formatted = ''.join(chars)
|
|
else:
|
|
formatted = bytes_to_hex(byte_array[:bytes_read])
|
|
|
|
return {
|
|
"success": True,
|
|
"result": {
|
|
"address": str(addr),
|
|
"bytesRead": bytes_read,
|
|
"format": fmt,
|
|
"bytes": formatted,
|
|
"hexBytes": bytes_to_hex(byte_array[:bytes_read]),
|
|
"rawBytes": bytes_to_hex(byte_array[:bytes_read]),
|
|
}
|
|
}
|
|
except Exception as e:
|
|
return {"success": False, "error": {"code": "MEMORY_ERROR", "message": str(e)}}
|
|
|
|
def handle_memory_write(self, exchange, addr_str):
|
|
if not self.program:
|
|
return self._no_program()
|
|
body = parse_json_body(exchange)
|
|
bytes_str = body.get("bytes")
|
|
fmt = body.get("format", "hex")
|
|
|
|
if not bytes_str:
|
|
return {"success": False, "error": {"code": "MISSING_PARAMETER", "message": "Missing 'bytes'"}}
|
|
|
|
try:
|
|
addr = self.program.getAddressFactory().getAddress(addr_str)
|
|
if fmt == "base64":
|
|
from java.util import Base64
|
|
byte_vals = list(Base64.getDecoder().decode(bytes_str))
|
|
elif fmt == "string":
|
|
byte_vals = [ord(c) for c in bytes_str]
|
|
else:
|
|
byte_vals = hex_to_bytes(bytes_str)
|
|
|
|
# Convert to Java byte array
|
|
from jarray import zeros
|
|
java_bytes = zeros(len(byte_vals), 'b')
|
|
for i, b in enumerate(byte_vals):
|
|
java_bytes[i] = b if b <= 127 else b - 256
|
|
|
|
def do_write():
|
|
self.program.getMemory().setBytes(addr, java_bytes)
|
|
|
|
with_transaction(self.program, "Write memory", do_write)
|
|
return {"success": True, "result": {"address": str(addr), "bytesWritten": len(byte_vals)}}
|
|
except Exception as e:
|
|
return {"success": False, "error": {"code": "MEMORY_ERROR", "message": str(e)}}
|
|
|
|
def handle_memory_blocks(self, exchange):
|
|
"""List memory blocks (alias for /segments)."""
|
|
return self.handle_segments(exchange)
|
|
|
|
def handle_get_comment(self, exchange, addr_str, comment_type):
|
|
"""Get a comment at a specific address."""
|
|
if not self.program:
|
|
return self._no_program()
|
|
|
|
ct = COMMENT_TYPE_MAP.get(comment_type.lower())
|
|
if ct is None:
|
|
return {"success": False, "error": {
|
|
"code": "INVALID_COMMENT_TYPE",
|
|
"message": "Invalid comment type: %s. Use: pre, post, eol, plate, repeatable" % comment_type
|
|
}}
|
|
|
|
try:
|
|
addr = self.program.getAddressFactory().getAddress(addr_str)
|
|
listing = self.program.getListing()
|
|
cu = listing.getCodeUnitAt(addr)
|
|
if not cu:
|
|
cu = listing.getCodeUnitContaining(addr)
|
|
if not cu:
|
|
return {"success": True, "result": {"address": addr_str, "commentType": comment_type, "comment": None}}
|
|
|
|
comment = cu.getComment(ct)
|
|
return {"success": True, "result": {
|
|
"address": addr_str,
|
|
"commentType": comment_type,
|
|
"comment": comment,
|
|
}}
|
|
except Exception as e:
|
|
return {"success": False, "error": {"code": "COMMENT_ERROR", "message": str(e)}}
|
|
|
|
def handle_set_comment(self, exchange, addr_str, comment_type):
|
|
"""Set a comment at a specific address."""
|
|
if not self.program:
|
|
return self._no_program()
|
|
body = parse_json_body(exchange)
|
|
comment_text = body.get("comment", "")
|
|
|
|
ct = COMMENT_TYPE_MAP.get(comment_type.lower())
|
|
if ct is None:
|
|
return {"success": False, "error": {
|
|
"code": "INVALID_COMMENT_TYPE",
|
|
"message": "Invalid comment type: %s. Use: pre, post, eol, plate, repeatable" % comment_type
|
|
}}
|
|
|
|
try:
|
|
addr = self.program.getAddressFactory().getAddress(addr_str)
|
|
listing = self.program.getListing()
|
|
cu = listing.getCodeUnitAt(addr)
|
|
if not cu:
|
|
cu = listing.getCodeUnitContaining(addr)
|
|
if not cu:
|
|
return {"success": False, "error": {"code": "NO_CODE_UNIT", "message": "No code unit at address: %s" % addr_str}}
|
|
|
|
def do_comment():
|
|
cu.setComment(ct, comment_text if comment_text else None)
|
|
|
|
with_transaction(self.program, "Set comment", do_comment)
|
|
return {"success": True, "result": {
|
|
"address": addr_str,
|
|
"commentType": comment_type,
|
|
"comment": comment_text,
|
|
"message": "Comment set successfully",
|
|
}}
|
|
except Exception as e:
|
|
return {"success": False, "error": {"code": "COMMENT_ERROR", "message": str(e)}}
|
|
|
|
# ==================================================================
|
|
# Segments Handler
|
|
# ==================================================================
|
|
|
|
def handle_segments(self, exchange):
|
|
if not self.program:
|
|
return self._no_program()
|
|
params = parse_query_params(exchange)
|
|
name_filter = params.get("name")
|
|
|
|
segments = []
|
|
for block in self.program.getMemory().getBlocks():
|
|
if name_filter and name_filter not in block.getName():
|
|
continue
|
|
segments.append({
|
|
"name": block.getName(),
|
|
"start": str(block.getStart()),
|
|
"end": str(block.getEnd()),
|
|
"size": block.getSize(),
|
|
"readable": block.isRead(),
|
|
"writable": block.isWrite(),
|
|
"executable": block.isExecute(),
|
|
"initialized": block.isInitialized(),
|
|
"_links": {
|
|
"self": make_link("/segments/%s" % block.getName()),
|
|
"memory": make_link("/memory?address=%s&length=1024" % str(block.getStart())),
|
|
},
|
|
})
|
|
|
|
return {"success": True, "result": segments}
|
|
|
|
# ==================================================================
|
|
# Symbol Handlers
|
|
# ==================================================================
|
|
|
|
def handle_symbols(self, exchange):
|
|
if not self.program:
|
|
return self._no_program()
|
|
params = parse_query_params(exchange)
|
|
limit = parse_int(params.get("limit"), 100)
|
|
offset = parse_int(params.get("offset"), 0)
|
|
name_filter = params.get("name")
|
|
type_filter = params.get("type")
|
|
grep_pattern = compile_grep(params)
|
|
|
|
symbols = []
|
|
st = self.program.getSymbolTable()
|
|
count = 0
|
|
skipped = 0
|
|
|
|
for symbol in st.getAllSymbols(True):
|
|
if count >= limit:
|
|
break
|
|
if name_filter and name_filter.lower() not in symbol.getName().lower():
|
|
continue
|
|
if type_filter and str(symbol.getSymbolType()).lower() != type_filter.lower():
|
|
continue
|
|
if skipped < offset:
|
|
skipped += 1
|
|
continue
|
|
item = {
|
|
"name": symbol.getName(),
|
|
"address": str(symbol.getAddress()),
|
|
"namespace": symbol.getParentNamespace().getName(),
|
|
"type": str(symbol.getSymbolType()),
|
|
"isPrimary": symbol.isPrimary(),
|
|
"isExternal": symbol.isExternal(),
|
|
}
|
|
if not grep_matches_item(item, grep_pattern):
|
|
continue
|
|
symbols.append(item)
|
|
count += 1
|
|
|
|
return {
|
|
"success": True,
|
|
"result": symbols,
|
|
"offset": offset,
|
|
"limit": limit,
|
|
"_links": {
|
|
"self": make_link("/symbols"),
|
|
"imports": make_link("/symbols/imports"),
|
|
"exports": make_link("/symbols/exports"),
|
|
},
|
|
}
|
|
|
|
def handle_imports(self, exchange):
|
|
if not self.program:
|
|
return self._no_program()
|
|
params = parse_query_params(exchange)
|
|
limit = parse_int(params.get("limit"), 100)
|
|
offset = parse_int(params.get("offset"), 0)
|
|
grep_pattern = compile_grep(params)
|
|
|
|
imports = []
|
|
count = 0
|
|
skipped = 0
|
|
for symbol in self.program.getSymbolTable().getExternalSymbols():
|
|
if count >= limit:
|
|
break
|
|
if skipped < offset:
|
|
skipped += 1
|
|
continue
|
|
item = {
|
|
"name": symbol.getName(),
|
|
"address": str(symbol.getAddress()),
|
|
"namespace": symbol.getParentNamespace().getName(),
|
|
}
|
|
if not grep_matches_item(item, grep_pattern):
|
|
continue
|
|
imports.append(item)
|
|
count += 1
|
|
|
|
return {"success": True, "result": imports, "offset": offset, "limit": limit}
|
|
|
|
def handle_exports(self, exchange):
|
|
if not self.program:
|
|
return self._no_program()
|
|
params = parse_query_params(exchange)
|
|
limit = parse_int(params.get("limit"), 100)
|
|
offset = parse_int(params.get("offset"), 0)
|
|
grep_pattern = compile_grep(params)
|
|
|
|
exports = []
|
|
count = 0
|
|
skipped = 0
|
|
for symbol in self.program.getSymbolTable().getAllSymbols(True):
|
|
if count >= limit:
|
|
break
|
|
if not symbol.isExternalEntryPoint():
|
|
continue
|
|
if skipped < offset:
|
|
skipped += 1
|
|
continue
|
|
item = {
|
|
"name": symbol.getName(),
|
|
"address": str(symbol.getAddress()),
|
|
}
|
|
if not grep_matches_item(item, grep_pattern):
|
|
continue
|
|
exports.append(item)
|
|
count += 1
|
|
|
|
return {"success": True, "result": exports, "offset": offset, "limit": limit}
|
|
|
|
# ==================================================================
|
|
# Cross-References Handler
|
|
# ==================================================================
|
|
|
|
def handle_xrefs(self, exchange):
|
|
if not self.program:
|
|
return self._no_program()
|
|
params = parse_query_params(exchange)
|
|
to_addr_str = params.get("to_addr")
|
|
from_addr_str = params.get("from_addr")
|
|
type_filter = params.get("type")
|
|
limit = parse_int(params.get("limit"), 100)
|
|
offset = parse_int(params.get("offset"), 0)
|
|
grep_pattern = compile_grep(params)
|
|
|
|
if not to_addr_str and not from_addr_str:
|
|
return {"success": False, "error": {
|
|
"code": "MISSING_PARAMETER",
|
|
"message": "Either 'to_addr' or 'from_addr' parameter is required"
|
|
}}
|
|
|
|
xrefs = []
|
|
ref_mgr = self.program.getReferenceManager()
|
|
|
|
try:
|
|
if to_addr_str:
|
|
addr = self.program.getAddressFactory().getAddress(to_addr_str)
|
|
refs = ref_mgr.getReferencesTo(addr)
|
|
count = 0
|
|
skipped = 0
|
|
while refs.hasNext() and count < limit:
|
|
ref = refs.next()
|
|
if type_filter and self._get_ref_type_name(ref.getReferenceType()).lower() != type_filter.lower():
|
|
continue
|
|
if skipped < offset:
|
|
skipped += 1
|
|
continue
|
|
item = self._build_xref_info(ref)
|
|
if not grep_matches_item(item, grep_pattern):
|
|
continue
|
|
xrefs.append(item)
|
|
count += 1
|
|
|
|
if from_addr_str:
|
|
addr = self.program.getAddressFactory().getAddress(from_addr_str)
|
|
refs = ref_mgr.getReferencesFrom(addr)
|
|
count = 0
|
|
skipped = 0
|
|
for ref in refs:
|
|
if count >= limit:
|
|
break
|
|
if type_filter and self._get_ref_type_name(ref.getReferenceType()).lower() != type_filter.lower():
|
|
continue
|
|
if skipped < offset:
|
|
skipped += 1
|
|
continue
|
|
item = self._build_xref_info(ref)
|
|
if not grep_matches_item(item, grep_pattern):
|
|
continue
|
|
xrefs.append(item)
|
|
count += 1
|
|
|
|
return {"success": True, "result": xrefs, "offset": offset, "limit": limit}
|
|
except Exception as e:
|
|
return {"success": False, "error": {"code": "XREF_ERROR", "message": str(e)}}
|
|
|
|
# ==================================================================
|
|
# Classes & Namespaces Handlers
|
|
# ==================================================================
|
|
|
|
def handle_classes(self, exchange):
|
|
if not self.program:
|
|
return self._no_program()
|
|
params = parse_query_params(exchange)
|
|
limit = parse_int(params.get("limit"), 100)
|
|
offset = parse_int(params.get("offset"), 0)
|
|
|
|
class_names = set()
|
|
for symbol in self.program.getSymbolTable().getAllSymbols(True):
|
|
ns = symbol.getParentNamespace()
|
|
if ns and not ns.isGlobal() and ns.getSymbol().getSymbolType().isNamespace():
|
|
class_names.add(ns.getName(True))
|
|
|
|
sorted_names = sorted(class_names)
|
|
start = min(offset, len(sorted_names))
|
|
end = min(offset + limit, len(sorted_names))
|
|
|
|
classes = []
|
|
for name in sorted_names[start:end]:
|
|
info = {"name": name}
|
|
if "::" in name:
|
|
info["namespace"] = name[:name.rfind("::")]
|
|
info["simpleName"] = name[name.rfind("::") + 2:]
|
|
else:
|
|
info["namespace"] = "global"
|
|
info["simpleName"] = name
|
|
classes.append(info)
|
|
|
|
return {
|
|
"success": True,
|
|
"result": classes,
|
|
"size": len(sorted_names),
|
|
"offset": offset,
|
|
"limit": limit,
|
|
}
|
|
|
|
def handle_namespaces(self, exchange):
|
|
if not self.program:
|
|
return self._no_program()
|
|
params = parse_query_params(exchange)
|
|
limit = parse_int(params.get("limit"), 100)
|
|
offset = parse_int(params.get("offset"), 0)
|
|
|
|
namespaces = set()
|
|
for symbol in self.program.getSymbolTable().getAllSymbols(True):
|
|
ns = symbol.getParentNamespace()
|
|
if ns and not ns.isGlobal():
|
|
namespaces.add(ns.getName(True))
|
|
|
|
sorted_ns = sorted(namespaces)
|
|
start = min(offset, len(sorted_ns))
|
|
end = min(offset + limit, len(sorted_ns))
|
|
|
|
return {
|
|
"success": True,
|
|
"result": sorted_ns[start:end],
|
|
"size": len(sorted_ns),
|
|
"offset": offset,
|
|
"limit": limit,
|
|
}
|
|
|
|
# ==================================================================
|
|
# Structs Handlers
|
|
# ==================================================================
|
|
|
|
def handle_structs(self, exchange):
|
|
if not self.program:
|
|
return self._no_program()
|
|
params = parse_query_params(exchange)
|
|
|
|
# Detail lookup by name
|
|
name_lookup = params.get("name")
|
|
if name_lookup:
|
|
return self._handle_struct_detail(name_lookup)
|
|
|
|
# List structs
|
|
limit = parse_int(params.get("limit"), 100)
|
|
offset = parse_int(params.get("offset"), 0)
|
|
category_filter = params.get("category")
|
|
grep_pattern = compile_grep(params)
|
|
|
|
from ghidra.program.model.data import Structure, Union
|
|
|
|
structs = []
|
|
dtm = self.program.getDataTypeManager()
|
|
count = 0
|
|
skipped = 0
|
|
|
|
it = dtm.getAllDataTypes()
|
|
while it.hasNext() and count < limit:
|
|
dt = it.next()
|
|
if not isinstance(dt, (Structure, Union)):
|
|
continue
|
|
if category_filter and category_filter.lower() not in dt.getCategoryPath().getPath().lower():
|
|
continue
|
|
if skipped < offset:
|
|
skipped += 1
|
|
continue
|
|
item = {
|
|
"name": dt.getName(),
|
|
"category": dt.getCategoryPath().getPath(),
|
|
"path": dt.getPathName(),
|
|
"size": dt.getLength(),
|
|
"type": "struct" if isinstance(dt, Structure) else "union",
|
|
"numFields": dt.getNumComponents(),
|
|
"_links": {"self": make_link("/structs?name=%s" % dt.getName())},
|
|
}
|
|
if not grep_matches_item(item, grep_pattern):
|
|
continue
|
|
structs.append(item)
|
|
count += 1
|
|
|
|
return {"success": True, "result": structs, "offset": offset, "limit": limit}
|
|
|
|
def _handle_struct_detail(self, name):
|
|
from ghidra.program.model.data import Composite
|
|
dt = self._find_struct(name)
|
|
if not dt:
|
|
return {"success": False, "error": {"code": "STRUCT_NOT_FOUND", "message": "Struct not found: %s" % name}}
|
|
|
|
result = {
|
|
"name": dt.getName(),
|
|
"category": dt.getCategoryPath().getPath(),
|
|
"path": dt.getPathName(),
|
|
"size": dt.getLength(),
|
|
"description": dt.getDescription() or "",
|
|
}
|
|
|
|
if isinstance(dt, Composite):
|
|
fields = []
|
|
for comp in dt.getComponents():
|
|
field = {
|
|
"name": comp.getFieldName(),
|
|
"type": comp.getDataType().getName(),
|
|
"typePath": comp.getDataType().getPathName(),
|
|
"offset": comp.getOffset(),
|
|
"length": comp.getLength(),
|
|
}
|
|
if comp.getComment():
|
|
field["comment"] = comp.getComment()
|
|
fields.append(field)
|
|
result["fields"] = fields
|
|
result["numFields"] = len(fields)
|
|
|
|
result["_links"] = {
|
|
"self": make_link("/structs?name=%s" % name),
|
|
"structs": make_link("/structs"),
|
|
}
|
|
return {"success": True, "result": result}
|
|
|
|
def handle_struct_create(self, exchange):
|
|
if not self.program:
|
|
return self._no_program()
|
|
body = parse_json_body(exchange)
|
|
name = body.get("name")
|
|
if not name:
|
|
return {"success": False, "error": {"code": "MISSING_PARAMETER", "message": "Missing 'name'"}}
|
|
|
|
category = body.get("category", "/")
|
|
description = body.get("description", "")
|
|
|
|
try:
|
|
from ghidra.program.model.data import StructureDataType, CategoryPath, DataTypeConflictHandler
|
|
cat_path = CategoryPath(category)
|
|
new_struct = StructureDataType(cat_path, name, 0)
|
|
if description:
|
|
new_struct.setDescription(description)
|
|
|
|
result_holder = [None]
|
|
|
|
def do_create():
|
|
dtm = self.program.getDataTypeManager()
|
|
result_holder[0] = dtm.addDataType(new_struct, DataTypeConflictHandler.DEFAULT_HANDLER)
|
|
|
|
with_transaction(self.program, "Create struct", do_create)
|
|
dt = result_holder[0]
|
|
return ({"success": True, "result": {
|
|
"name": dt.getName() if dt else name,
|
|
"path": dt.getPathName() if dt else "%s/%s" % (category, name),
|
|
"category": category,
|
|
"size": 0,
|
|
"message": "Struct created successfully",
|
|
}}, 201)
|
|
except Exception as e:
|
|
return {"success": False, "error": {"code": "STRUCT_ERROR", "message": str(e)}}
|
|
|
|
def handle_struct_addfield(self, exchange):
|
|
if not self.program:
|
|
return self._no_program()
|
|
body = parse_json_body(exchange)
|
|
struct_name = body.get("struct")
|
|
field_name = body.get("fieldName")
|
|
field_type = body.get("fieldType")
|
|
|
|
if not struct_name or not field_name or not field_type:
|
|
return {"success": False, "error": {"code": "MISSING_PARAMETER", "message": "Missing 'struct', 'fieldName', or 'fieldType'"}}
|
|
|
|
struct = self._find_struct(struct_name)
|
|
if not struct:
|
|
return {"success": False, "error": {"code": "STRUCT_NOT_FOUND", "message": "Struct not found: %s" % struct_name}}
|
|
|
|
dtm = self.program.getDataTypeManager()
|
|
dt = resolve_data_type(dtm, field_type)
|
|
if not dt:
|
|
return {"success": False, "error": {"code": "UNKNOWN_TYPE", "message": "Cannot resolve type: %s" % field_type}}
|
|
|
|
field_offset = body.get("offset")
|
|
comment = body.get("comment")
|
|
result_info = {}
|
|
|
|
def do_add():
|
|
if field_offset is not None:
|
|
struct.insertAtOffset(int(field_offset), dt, dt.getLength(), field_name, comment)
|
|
result_info["offset"] = int(field_offset)
|
|
else:
|
|
struct.add(dt, dt.getLength(), field_name, comment)
|
|
result_info["offset"] = struct.getLength() - dt.getLength()
|
|
result_info["length"] = dt.getLength()
|
|
result_info["structSize"] = struct.getLength()
|
|
|
|
try:
|
|
with_transaction(self.program, "Add struct field", do_add)
|
|
return {"success": True, "result": {
|
|
"struct": struct_name,
|
|
"fieldName": field_name,
|
|
"fieldType": field_type,
|
|
"offset": result_info.get("offset", 0),
|
|
"length": result_info.get("length", 0),
|
|
"structSize": result_info.get("structSize", 0),
|
|
"message": "Field added successfully",
|
|
}}
|
|
except Exception as e:
|
|
return {"success": False, "error": {"code": "FIELD_ERROR", "message": str(e)}}
|
|
|
|
def handle_struct_updatefield(self, exchange):
|
|
if not self.program:
|
|
return self._no_program()
|
|
body = parse_json_body(exchange)
|
|
struct_name = body.get("struct")
|
|
if not struct_name:
|
|
return {"success": False, "error": {"code": "MISSING_PARAMETER", "message": "Missing 'struct'"}}
|
|
|
|
struct = self._find_struct(struct_name)
|
|
if not struct:
|
|
return {"success": False, "error": {"code": "STRUCT_NOT_FOUND", "message": "Struct not found: %s" % struct_name}}
|
|
|
|
field_name = body.get("fieldName")
|
|
field_offset = body.get("fieldOffset")
|
|
new_name = body.get("newName")
|
|
new_type = body.get("newType")
|
|
new_comment = body.get("newComment")
|
|
|
|
if not new_name and not new_type and new_comment is None:
|
|
return {"success": False, "error": {"code": "MISSING_PARAMETER", "message": "Provide at least one of: newName, newType, newComment"}}
|
|
|
|
# Find the component
|
|
component = None
|
|
if field_offset is not None:
|
|
component = struct.getComponentAt(int(field_offset))
|
|
elif field_name:
|
|
for comp in struct.getComponents():
|
|
if comp.getFieldName() == field_name:
|
|
component = comp
|
|
break
|
|
|
|
if not component:
|
|
return {"success": False, "error": {"code": "FIELD_NOT_FOUND", "message": "Field not found"}}
|
|
|
|
orig_name = component.getFieldName()
|
|
orig_type = component.getDataType().getName()
|
|
orig_comment = component.getComment()
|
|
|
|
def do_update():
|
|
if new_name:
|
|
component.setFieldName(new_name)
|
|
if new_comment is not None:
|
|
component.setComment(new_comment)
|
|
if new_type:
|
|
dtm = self.program.getDataTypeManager()
|
|
dt = resolve_data_type(dtm, new_type)
|
|
if dt:
|
|
component.setDataType(dt)
|
|
|
|
try:
|
|
with_transaction(self.program, "Update struct field", do_update)
|
|
return {"success": True, "result": {
|
|
"struct": struct_name,
|
|
"offset": component.getOffset(),
|
|
"originalName": orig_name,
|
|
"originalType": orig_type,
|
|
"originalComment": orig_comment,
|
|
"newName": new_name or orig_name,
|
|
"newType": new_type or orig_type,
|
|
"newComment": new_comment if new_comment is not None else orig_comment,
|
|
"length": component.getLength(),
|
|
"message": "Field updated successfully",
|
|
}}
|
|
except Exception as e:
|
|
return {"success": False, "error": {"code": "UPDATE_ERROR", "message": str(e)}}
|
|
|
|
def handle_struct_delete(self, exchange):
|
|
if not self.program:
|
|
return self._no_program()
|
|
body = parse_json_body(exchange)
|
|
name = body.get("name")
|
|
if not name:
|
|
return {"success": False, "error": {"code": "MISSING_PARAMETER", "message": "Missing 'name'"}}
|
|
|
|
dt = self._find_struct(name)
|
|
if not dt:
|
|
return {"success": False, "error": {"code": "STRUCT_NOT_FOUND", "message": "Struct not found: %s" % name}}
|
|
|
|
category = dt.getCategoryPath().getPath()
|
|
path = dt.getPathName()
|
|
|
|
def do_delete():
|
|
dtm = self.program.getDataTypeManager()
|
|
dtm.remove(dt, getMonitor())
|
|
|
|
try:
|
|
with_transaction(self.program, "Delete struct", do_delete)
|
|
return {"success": True, "result": {
|
|
"name": name,
|
|
"path": path,
|
|
"category": category,
|
|
"message": "Struct deleted successfully",
|
|
}}
|
|
except Exception as e:
|
|
return {"success": False, "error": {"code": "DELETE_ERROR", "message": str(e)}}
|
|
|
|
# ==================================================================
|
|
# Analysis Handlers
|
|
# ==================================================================
|
|
|
|
def handle_analysis_info(self, exchange):
|
|
if not self.program:
|
|
return self._no_program()
|
|
result = {
|
|
"program": self.program.getName(),
|
|
"processor": str(self.program.getLanguage().getProcessor()),
|
|
"addressSize": self.program.getAddressFactory().getDefaultAddressSpace().getSize(),
|
|
"programLanguage": str(self.program.getLanguage()),
|
|
"availableAnalysis": ["callgraph", "xrefs", "decompile", "dataflow"],
|
|
"_links": {
|
|
"self": make_link("/analysis"),
|
|
"callgraph": make_link("/analysis/callgraph"),
|
|
"dataflow": make_link("/analysis/dataflow"),
|
|
"program": make_link("/program"),
|
|
},
|
|
}
|
|
return {"success": True, "result": result}
|
|
|
|
def handle_analysis_run(self, exchange):
|
|
if not self.program:
|
|
return self._no_program()
|
|
try:
|
|
# In headless mode, try to re-run auto-analysis
|
|
from ghidra.app.script import GhidraScriptUtil
|
|
analyzeAll(self.program)
|
|
return {"success": True, "result": {
|
|
"program": self.program.getName(),
|
|
"analysis_triggered": True,
|
|
"message": "Analysis initiated on program",
|
|
}}
|
|
except Exception as e:
|
|
# analyzeAll may not be available; graceful fallback
|
|
return {"success": True, "result": {
|
|
"program": self.program.getName(),
|
|
"analysis_triggered": False,
|
|
"message": "Analysis request received (headless mode: %s)" % str(e),
|
|
}}
|
|
|
|
def handle_callgraph(self, exchange):
|
|
if not self.program:
|
|
return self._no_program()
|
|
params = parse_query_params(exchange)
|
|
func_name = params.get("name")
|
|
func_addr = params.get("address")
|
|
max_depth = parse_int(params.get("max_depth"), 3)
|
|
|
|
start_func = None
|
|
if func_addr:
|
|
start_func = self._find_function_at(func_addr)
|
|
elif func_name:
|
|
start_func = self._find_function_by_name(func_name)
|
|
else:
|
|
# Use first entry point
|
|
entry_iter = self.program.getSymbolTable().getExternalEntryPointIterator()
|
|
if entry_iter.hasNext():
|
|
start_func = self.program.getFunctionManager().getFunctionAt(entry_iter.next())
|
|
|
|
if not start_func:
|
|
return {"success": False, "error": {"code": "FUNCTION_NOT_FOUND", "message": "Function not found for callgraph"}}
|
|
|
|
nodes = []
|
|
edges = []
|
|
visited = set()
|
|
|
|
self._build_callgraph(start_func, nodes, edges, visited, 0, max_depth)
|
|
|
|
graph = {
|
|
"root": start_func.getName(),
|
|
"rootAddress": str(start_func.getEntryPoint()),
|
|
"maxDepth": max_depth,
|
|
"nodes": nodes,
|
|
"edges": edges,
|
|
"nodeCount": len(nodes),
|
|
"edgeCount": len(edges),
|
|
}
|
|
return {"success": True, "result": graph}
|
|
|
|
def _build_callgraph(self, func, nodes, edges, visited, depth, max_depth):
|
|
func_id = str(func.getEntryPoint())
|
|
if func_id in visited:
|
|
return
|
|
visited.add(func_id)
|
|
|
|
nodes.append({
|
|
"id": func_id,
|
|
"name": func.getName(),
|
|
"address": func_id,
|
|
"depth": depth,
|
|
})
|
|
|
|
if depth >= max_depth:
|
|
return
|
|
|
|
ref_mgr = self.program.getReferenceManager()
|
|
body = func.getBody()
|
|
addr_iter = body.getAddresses(True)
|
|
|
|
while addr_iter.hasNext():
|
|
addr = addr_iter.next()
|
|
refs = ref_mgr.getReferencesFrom(addr)
|
|
for ref in refs:
|
|
if ref.getReferenceType().isCall():
|
|
called = self.program.getFunctionManager().getFunctionAt(ref.getToAddress())
|
|
if called:
|
|
edges.append({
|
|
"from": func_id,
|
|
"to": str(called.getEntryPoint()),
|
|
"type": "call",
|
|
"callSite": str(addr),
|
|
})
|
|
self._build_callgraph(called, nodes, edges, visited, depth + 1, max_depth)
|
|
|
|
def handle_dataflow(self, exchange):
|
|
if not self.program:
|
|
return self._no_program()
|
|
params = parse_query_params(exchange)
|
|
addr_str = params.get("address")
|
|
if not addr_str:
|
|
return {"success": False, "error": {"code": "MISSING_PARAMETER", "message": "Missing 'address' parameter"}}
|
|
|
|
direction = params.get("direction", "forward")
|
|
max_steps = parse_int(params.get("max_steps"), 50)
|
|
|
|
try:
|
|
addr = self.program.getAddressFactory().getAddress(addr_str)
|
|
func = self.program.getFunctionManager().getFunctionContaining(addr)
|
|
if not func:
|
|
return {"success": False, "error": {"code": "FUNCTION_NOT_FOUND", "message": "No function containing address: %s" % addr_str}}
|
|
|
|
# Simplified dataflow: walk instructions from the starting address
|
|
listing = self.program.getListing()
|
|
steps = []
|
|
current = addr
|
|
step_count = 0
|
|
|
|
if direction == "forward":
|
|
inst_iter = listing.getInstructions(addr, True)
|
|
else:
|
|
inst_iter = listing.getInstructions(addr, False)
|
|
|
|
while inst_iter.hasNext() and step_count < max_steps:
|
|
instr = inst_iter.next()
|
|
# Stay within function body
|
|
if not func.getBody().contains(instr.getAddress()):
|
|
break
|
|
full_repr = str(instr)
|
|
steps.append({
|
|
"address": str(instr.getAddress()),
|
|
"instruction": full_repr,
|
|
"mnemonic": instr.getMnemonicString(),
|
|
"bytes": bytes_to_hex(instr.getBytes()),
|
|
})
|
|
step_count += 1
|
|
|
|
return {
|
|
"success": True,
|
|
"result": {
|
|
"start_address": addr_str,
|
|
"function": func.getName(),
|
|
"direction": direction,
|
|
"max_steps": max_steps,
|
|
"total_steps": len(steps),
|
|
"steps": steps,
|
|
"sources": [],
|
|
"sinks": [],
|
|
}
|
|
}
|
|
except Exception as e:
|
|
return {"success": False, "error": {"code": "DATAFLOW_ERROR", "message": str(e)}}
|
|
|
|
# ==================================================================
|
|
# Symbol CRUD Handlers
|
|
# ==================================================================
|
|
|
|
def handle_symbol_create(self, exchange):
|
|
"""POST /symbols - Create a new label/symbol."""
|
|
if not self.program:
|
|
return self._no_program()
|
|
body = parse_json_body(exchange)
|
|
name = body.get("name", "")
|
|
address = body.get("address", "")
|
|
if not name or not address:
|
|
return {"success": False, "error": {"code": "MISSING_PARAMETER", "message": "Both 'name' and 'address' are required"}}
|
|
|
|
try:
|
|
addr = self.program.getAddressFactory().getAddress(address)
|
|
st = self.program.getSymbolTable()
|
|
|
|
def do_create():
|
|
st.createLabel(addr, name, SourceType.USER_DEFINED)
|
|
|
|
with_transaction(self.program, "Create symbol", do_create)
|
|
return {"success": True, "result": {
|
|
"name": name,
|
|
"address": address,
|
|
"message": "Symbol created successfully",
|
|
}}
|
|
except Exception as e:
|
|
return {"success": False, "error": {"code": "SYMBOL_ERROR", "message": str(e)}}
|
|
|
|
def handle_symbol_rename(self, exchange, addr_str):
|
|
"""PATCH /symbols/{address} - Rename primary symbol at address."""
|
|
if not self.program:
|
|
return self._no_program()
|
|
body = parse_json_body(exchange)
|
|
new_name = body.get("name", "")
|
|
if not new_name:
|
|
return {"success": False, "error": {"code": "MISSING_PARAMETER", "message": "'name' parameter is required"}}
|
|
|
|
try:
|
|
addr = self.program.getAddressFactory().getAddress(addr_str)
|
|
st = self.program.getSymbolTable()
|
|
symbol = st.getPrimarySymbol(addr)
|
|
if not symbol:
|
|
return {"success": False, "error": {"code": "NOT_FOUND", "message": "No symbol at address: %s" % addr_str}}
|
|
|
|
old_name = symbol.getName()
|
|
|
|
def do_rename():
|
|
symbol.setName(new_name, SourceType.USER_DEFINED)
|
|
|
|
with_transaction(self.program, "Rename symbol", do_rename)
|
|
return {"success": True, "result": {
|
|
"address": addr_str,
|
|
"oldName": old_name,
|
|
"newName": new_name,
|
|
"message": "Symbol renamed successfully",
|
|
}}
|
|
except Exception as e:
|
|
return {"success": False, "error": {"code": "SYMBOL_ERROR", "message": str(e)}}
|
|
|
|
def handle_symbol_delete(self, exchange, addr_str):
|
|
"""DELETE /symbols/{address} - Delete primary symbol at address."""
|
|
if not self.program:
|
|
return self._no_program()
|
|
|
|
try:
|
|
addr = self.program.getAddressFactory().getAddress(addr_str)
|
|
st = self.program.getSymbolTable()
|
|
symbol = st.getPrimarySymbol(addr)
|
|
if not symbol:
|
|
return {"success": False, "error": {"code": "NOT_FOUND", "message": "No symbol at address: %s" % addr_str}}
|
|
|
|
name = symbol.getName()
|
|
|
|
def do_delete():
|
|
symbol.delete()
|
|
|
|
with_transaction(self.program, "Delete symbol", do_delete)
|
|
return {"success": True, "result": {
|
|
"address": addr_str,
|
|
"name": name,
|
|
"message": "Symbol deleted successfully",
|
|
}}
|
|
except Exception as e:
|
|
return {"success": False, "error": {"code": "SYMBOL_ERROR", "message": str(e)}}
|
|
|
|
# ==================================================================
|
|
# Variable Rename Handler
|
|
# ==================================================================
|
|
|
|
def handle_variable_rename(self, exchange, addr_str, var_name):
|
|
"""PATCH /functions/{address}/variables/{name} - Rename/retype a variable."""
|
|
if not self.program:
|
|
return self._no_program()
|
|
body = parse_json_body(exchange)
|
|
new_name = body.get("name", "")
|
|
new_type = body.get("data_type")
|
|
|
|
if not new_name:
|
|
return {"success": False, "error": {"code": "MISSING_PARAMETER", "message": "'name' parameter is required"}}
|
|
|
|
try:
|
|
# URL-decode the variable name
|
|
from java.net import URLDecoder
|
|
decoded_name = URLDecoder.decode(var_name, "UTF-8")
|
|
|
|
func = self._find_function_at(addr_str)
|
|
if not func:
|
|
return {"success": False, "error": {"code": "FUNCTION_NOT_FOUND", "message": "No function at address: %s" % addr_str}}
|
|
|
|
# Search parameters and local variables
|
|
target_var = None
|
|
for param in func.getParameters():
|
|
if param.getName() == decoded_name:
|
|
target_var = param
|
|
break
|
|
if not target_var:
|
|
for var in func.getAllVariables():
|
|
if var.getName() == decoded_name:
|
|
target_var = var
|
|
break
|
|
|
|
if not target_var:
|
|
return {"success": False, "error": {"code": "NOT_FOUND", "message": "Variable '%s' not found in function" % decoded_name}}
|
|
|
|
old_name = target_var.getName()
|
|
|
|
def do_rename():
|
|
target_var.setName(new_name, SourceType.USER_DEFINED)
|
|
if new_type:
|
|
dtm = self.program.getDataTypeManager()
|
|
dt = resolve_data_type(dtm, new_type)
|
|
if dt:
|
|
target_var.setDataType(dt, SourceType.USER_DEFINED)
|
|
|
|
with_transaction(self.program, "Rename variable", do_rename)
|
|
return {"success": True, "result": {
|
|
"function": addr_str,
|
|
"oldName": old_name,
|
|
"newName": new_name,
|
|
"message": "Variable renamed successfully",
|
|
}}
|
|
except Exception as e:
|
|
return {"success": False, "error": {"code": "VARIABLE_ERROR", "message": str(e)}}
|
|
|
|
# ==================================================================
|
|
# Variables Handler
|
|
# ==================================================================
|
|
|
|
def handle_variables(self, exchange):
|
|
"""GET /variables - List global and function variables."""
|
|
if not self.program:
|
|
return self._no_program()
|
|
params = parse_query_params(exchange)
|
|
limit = parse_int(params.get("limit"), 100)
|
|
offset = parse_int(params.get("offset"), 0)
|
|
global_only = params.get("global_only", "false").lower() == "true"
|
|
search = params.get("search", "")
|
|
grep_pattern = compile_grep(params)
|
|
|
|
variables = []
|
|
count = 0
|
|
skipped = 0
|
|
|
|
# Function variables (parameters + locals)
|
|
if not global_only:
|
|
fm = self.program.getFunctionManager()
|
|
for func in fm.getFunctions(True):
|
|
if count >= limit:
|
|
break
|
|
func_name = func.getName()
|
|
func_addr = str(func.getEntryPoint())
|
|
|
|
for param in func.getParameters():
|
|
if count >= limit:
|
|
break
|
|
var_name = param.getName()
|
|
if search and search.lower() not in var_name.lower():
|
|
continue
|
|
if skipped < offset:
|
|
skipped += 1
|
|
continue
|
|
item = {
|
|
"name": var_name,
|
|
"type": str(param.getDataType()),
|
|
"storage": str(param.getVariableStorage()),
|
|
"scope": "parameter",
|
|
"function": func_name,
|
|
"functionAddress": func_addr,
|
|
}
|
|
if not grep_matches_item(item, grep_pattern):
|
|
continue
|
|
variables.append(item)
|
|
count += 1
|
|
|
|
for var in func.getLocalVariables():
|
|
if count >= limit:
|
|
break
|
|
var_name = var.getName()
|
|
if search and search.lower() not in var_name.lower():
|
|
continue
|
|
if skipped < offset:
|
|
skipped += 1
|
|
continue
|
|
item = {
|
|
"name": var_name,
|
|
"type": str(var.getDataType()),
|
|
"storage": str(var.getVariableStorage()),
|
|
"scope": "local",
|
|
"function": func_name,
|
|
"functionAddress": func_addr,
|
|
}
|
|
if not grep_matches_item(item, grep_pattern):
|
|
continue
|
|
variables.append(item)
|
|
count += 1
|
|
|
|
# Global variables (defined data with symbol names)
|
|
listing = self.program.getListing()
|
|
st = self.program.getSymbolTable()
|
|
for data in listing.getDefinedData(True):
|
|
if count >= limit:
|
|
break
|
|
addr = data.getAddress()
|
|
symbol = st.getPrimarySymbol(addr)
|
|
if not symbol:
|
|
continue
|
|
sym_name = symbol.getName()
|
|
# Skip auto-generated names
|
|
if sym_name.startswith("DAT_") or sym_name.startswith("s_"):
|
|
continue
|
|
if search and search.lower() not in sym_name.lower():
|
|
continue
|
|
if skipped < offset:
|
|
skipped += 1
|
|
continue
|
|
item = {
|
|
"name": sym_name,
|
|
"address": str(addr),
|
|
"type": str(data.getDataType()),
|
|
"scope": "global",
|
|
"size": data.getLength(),
|
|
}
|
|
if not grep_matches_item(item, grep_pattern):
|
|
continue
|
|
variables.append(item)
|
|
count += 1
|
|
|
|
return {"success": True, "result": variables, "offset": offset, "limit": limit}
|
|
|
|
# ==================================================================
|
|
# Bookmarks Handlers
|
|
# ==================================================================
|
|
|
|
def handle_bookmarks(self, exchange):
|
|
"""GET /bookmarks - List bookmarks with optional filtering."""
|
|
if not self.program:
|
|
return self._no_program()
|
|
params = parse_query_params(exchange)
|
|
limit = parse_int(params.get("limit"), 100)
|
|
offset = parse_int(params.get("offset"), 0)
|
|
type_filter = params.get("type")
|
|
category_filter = params.get("category")
|
|
grep_pattern = compile_grep(params)
|
|
|
|
bm = self.program.getBookmarkManager()
|
|
bookmarks = []
|
|
count = 0
|
|
skipped = 0
|
|
|
|
# Get bookmark types to iterate
|
|
if type_filter:
|
|
bm_types = [type_filter]
|
|
else:
|
|
bm_types = [str(t) for t in bm.getBookmarkTypes()]
|
|
|
|
for btype in bm_types:
|
|
if count >= limit:
|
|
break
|
|
try:
|
|
it = bm.getBookmarksIterator(btype)
|
|
except:
|
|
continue
|
|
while it.hasNext() and count < limit:
|
|
bookmark = it.next()
|
|
if category_filter and bookmark.getCategory() != category_filter:
|
|
continue
|
|
if skipped < offset:
|
|
skipped += 1
|
|
continue
|
|
item = {
|
|
"address": str(bookmark.getAddress()),
|
|
"type": bookmark.getTypeString(),
|
|
"category": bookmark.getCategory(),
|
|
"comment": bookmark.getComment(),
|
|
}
|
|
if not grep_matches_item(item, grep_pattern):
|
|
continue
|
|
bookmarks.append(item)
|
|
count += 1
|
|
|
|
return {"success": True, "result": bookmarks, "offset": offset, "limit": limit}
|
|
|
|
def handle_bookmark_create(self, exchange):
|
|
"""POST /bookmarks - Create a bookmark."""
|
|
if not self.program:
|
|
return self._no_program()
|
|
body = parse_json_body(exchange)
|
|
address = body.get("address", "")
|
|
btype = body.get("type", "Note")
|
|
category = body.get("category", "")
|
|
comment = body.get("comment", "")
|
|
|
|
if not address:
|
|
return {"success": False, "error": {"code": "MISSING_PARAMETER", "message": "'address' is required"}}
|
|
|
|
try:
|
|
addr = self.program.getAddressFactory().getAddress(address)
|
|
bm = self.program.getBookmarkManager()
|
|
|
|
def do_create():
|
|
bm.setBookmark(addr, btype, category, comment)
|
|
|
|
with_transaction(self.program, "Create bookmark", do_create)
|
|
return {"success": True, "result": {
|
|
"address": address,
|
|
"type": btype,
|
|
"category": category,
|
|
"comment": comment,
|
|
"message": "Bookmark created successfully",
|
|
}}
|
|
except Exception as e:
|
|
return {"success": False, "error": {"code": "BOOKMARK_ERROR", "message": str(e)}}
|
|
|
|
def handle_bookmark_delete(self, exchange, addr_str):
|
|
"""DELETE /bookmarks/{address} - Delete all bookmarks at address."""
|
|
if not self.program:
|
|
return self._no_program()
|
|
|
|
try:
|
|
addr = self.program.getAddressFactory().getAddress(addr_str)
|
|
bm = self.program.getBookmarkManager()
|
|
removed = []
|
|
|
|
def do_delete():
|
|
for bookmark in list(bm.getBookmarks(addr)):
|
|
removed.append(bookmark.getTypeString())
|
|
bookmark.remove() if hasattr(bookmark, 'remove') else bm.removeBookmark(bookmark)
|
|
|
|
with_transaction(self.program, "Delete bookmarks", do_delete)
|
|
return {"success": True, "result": {
|
|
"address": addr_str,
|
|
"removedTypes": removed,
|
|
"count": len(removed),
|
|
"message": "Bookmarks deleted successfully",
|
|
}}
|
|
except Exception as e:
|
|
return {"success": False, "error": {"code": "BOOKMARK_ERROR", "message": str(e)}}
|
|
|
|
# ==================================================================
|
|
# Enum Handlers
|
|
# ==================================================================
|
|
|
|
def handle_enums(self, exchange):
|
|
"""GET /datatypes/enums - List enum data types."""
|
|
if not self.program:
|
|
return self._no_program()
|
|
params = parse_query_params(exchange)
|
|
limit = parse_int(params.get("limit"), 100)
|
|
offset = parse_int(params.get("offset"), 0)
|
|
grep_pattern = compile_grep(params)
|
|
|
|
from ghidra.program.model.data import Enum as GhidraEnum
|
|
|
|
dtm = self.program.getDataTypeManager()
|
|
enums = []
|
|
count = 0
|
|
skipped = 0
|
|
|
|
for dt in dtm.getAllDataTypes():
|
|
if count >= limit:
|
|
break
|
|
if not isinstance(dt, GhidraEnum):
|
|
continue
|
|
if skipped < offset:
|
|
skipped += 1
|
|
continue
|
|
|
|
# Get enum members
|
|
members = []
|
|
for name in dt.getNames():
|
|
members.append({"name": name, "value": dt.getValue(name)})
|
|
|
|
item = {
|
|
"name": dt.getName(),
|
|
"category": str(dt.getCategoryPath()),
|
|
"size": dt.getLength(),
|
|
"members": members,
|
|
}
|
|
if not grep_matches_item(item, grep_pattern):
|
|
continue
|
|
enums.append(item)
|
|
count += 1
|
|
|
|
return {"success": True, "result": enums, "offset": offset, "limit": limit}
|
|
|
|
def handle_enum_create(self, exchange):
|
|
"""POST /datatypes/enums - Create a new enum."""
|
|
if not self.program:
|
|
return self._no_program()
|
|
body = parse_json_body(exchange)
|
|
name = body.get("name", "")
|
|
size = int(body.get("size", 4))
|
|
|
|
if not name:
|
|
return {"success": False, "error": {"code": "MISSING_PARAMETER", "message": "'name' is required"}}
|
|
|
|
try:
|
|
from ghidra.program.model.data import EnumDataType, CategoryPath
|
|
|
|
dtm = self.program.getDataTypeManager()
|
|
new_enum = EnumDataType(name, size)
|
|
|
|
def do_create():
|
|
dtm.addDataType(new_enum, None)
|
|
|
|
with_transaction(self.program, "Create enum", do_create)
|
|
return {"success": True, "result": {
|
|
"name": name,
|
|
"size": size,
|
|
"message": "Enum created successfully",
|
|
}}
|
|
except Exception as e:
|
|
return {"success": False, "error": {"code": "ENUM_ERROR", "message": str(e)}}
|
|
|
|
# ==================================================================
|
|
# Typedef Handlers
|
|
# ==================================================================
|
|
|
|
def handle_typedefs(self, exchange):
|
|
"""GET /datatypes/typedefs - List typedef data types."""
|
|
if not self.program:
|
|
return self._no_program()
|
|
params = parse_query_params(exchange)
|
|
limit = parse_int(params.get("limit"), 100)
|
|
offset = parse_int(params.get("offset"), 0)
|
|
grep_pattern = compile_grep(params)
|
|
|
|
from ghidra.program.model.data import TypeDef
|
|
|
|
dtm = self.program.getDataTypeManager()
|
|
typedefs = []
|
|
count = 0
|
|
skipped = 0
|
|
|
|
for dt in dtm.getAllDataTypes():
|
|
if count >= limit:
|
|
break
|
|
if not isinstance(dt, TypeDef):
|
|
continue
|
|
if skipped < offset:
|
|
skipped += 1
|
|
continue
|
|
item = {
|
|
"name": dt.getName(),
|
|
"category": str(dt.getCategoryPath()),
|
|
"baseType": dt.getBaseDataType().getName() if dt.getBaseDataType() else None,
|
|
"size": dt.getLength(),
|
|
}
|
|
if not grep_matches_item(item, grep_pattern):
|
|
continue
|
|
typedefs.append(item)
|
|
count += 1
|
|
|
|
return {"success": True, "result": typedefs, "offset": offset, "limit": limit}
|
|
|
|
def handle_typedef_create(self, exchange):
|
|
"""POST /datatypes/typedefs - Create a new typedef."""
|
|
if not self.program:
|
|
return self._no_program()
|
|
body = parse_json_body(exchange)
|
|
name = body.get("name", "")
|
|
base_type_name = body.get("base_type", "")
|
|
|
|
if not name or not base_type_name:
|
|
return {"success": False, "error": {"code": "MISSING_PARAMETER", "message": "'name' and 'base_type' are required"}}
|
|
|
|
try:
|
|
from ghidra.program.model.data import TypedefDataType
|
|
|
|
dtm = self.program.getDataTypeManager()
|
|
|
|
# Use the shared resolver which handles builtins + path lookups
|
|
base_dt = resolve_data_type(dtm, base_type_name)
|
|
if not base_dt:
|
|
return {"success": False, "error": {"code": "NOT_FOUND", "message": "Base type not found: %s" % base_type_name}}
|
|
|
|
new_typedef = TypedefDataType(name, base_dt)
|
|
|
|
def do_create():
|
|
dtm.addDataType(new_typedef, None)
|
|
|
|
with_transaction(self.program, "Create typedef", do_create)
|
|
return {"success": True, "result": {
|
|
"name": name,
|
|
"baseType": base_type_name,
|
|
"message": "Typedef created successfully",
|
|
}}
|
|
except Exception as e:
|
|
return {"success": False, "error": {"code": "TYPEDEF_ERROR", "message": str(e)}}
|
|
|
|
# ==================================================================
|
|
# Legacy Compatibility
|
|
# ==================================================================
|
|
|
|
def handle_decompile_legacy(self, exchange):
|
|
"""Handle GET /decompile?name=X or ?address=X (backwards compat)."""
|
|
if not self.program:
|
|
return self._no_program()
|
|
params = parse_query_params(exchange)
|
|
name = params.get("name") or params.get("address")
|
|
if not name:
|
|
return {"success": False, "error": {"code": "MISSING_PARAMETER", "message": "Missing 'name' or 'address' parameter"}}
|
|
|
|
# Try by name first, then by address
|
|
func = self._find_function_by_name(name)
|
|
if not func:
|
|
func = self._find_function_at(name)
|
|
if not func:
|
|
return {"success": False, "error": {"code": "FUNCTION_NOT_FOUND", "message": "Function not found: %s" % name}}
|
|
|
|
result = self.decompiler.decompileFunction(func, 30, getMonitor())
|
|
if result and result.decompileCompleted():
|
|
code = result.getDecompiledFunction().getC()
|
|
return {
|
|
"success": True,
|
|
"name": func.getName(),
|
|
"address": str(func.getEntryPoint()),
|
|
"decompiled": code,
|
|
}
|
|
return {"success": False, "error": {"code": "DECOMPILE_FAILED", "message": "Decompilation failed"}}
|
|
|
|
|
|
# ========================================================================
|
|
# Server Startup
|
|
# ========================================================================
|
|
|
|
def run_server(port, program, decompiler):
|
|
"""Start the HTTP server with a single catch-all handler."""
|
|
server = HttpServer.create(InetSocketAddress(port), 0)
|
|
server.createContext("/", MCGhidraHandler(program, decompiler))
|
|
server.setExecutor(Executors.newCachedThreadPool())
|
|
server.start()
|
|
println("[MCGhidra] HTTP server started on port %d" % port)
|
|
return server
|
|
|
|
|
|
# ========================================================================
|
|
# Main Entry Point
|
|
# ========================================================================
|
|
|
|
def main():
|
|
port = DEFAULT_PORT
|
|
|
|
# Parse port from script arguments
|
|
args = getScriptArgs()
|
|
if args and len(args) > 0:
|
|
try:
|
|
port = int(args[0])
|
|
except:
|
|
println("Invalid port number, using default: %d" % DEFAULT_PORT)
|
|
|
|
# Initialize decompiler
|
|
decompiler = DecompInterface()
|
|
decompiler.openProgram(currentProgram)
|
|
|
|
println("=========================================")
|
|
println(" MCGhidra Headless HTTP Server")
|
|
println("=========================================")
|
|
println(" API Version: %s (compat: %d)" % (API_VERSION_STRING, API_VERSION))
|
|
println(" Port: %d" % port)
|
|
println(" Program: %s" % (currentProgram.getName() if currentProgram else "None"))
|
|
println(" Script: Python/Jython (Full API)")
|
|
println(" Routes: %d" % len(ROUTES))
|
|
println("=========================================")
|
|
|
|
server = run_server(port, currentProgram, decompiler)
|
|
|
|
println("")
|
|
println("MCGhidra Server running. Press Ctrl+C to stop.")
|
|
println("API available at: http://localhost:%d/" % port)
|
|
|
|
# Keep the script running
|
|
try:
|
|
while True:
|
|
time.sleep(1)
|
|
except KeyboardInterrupt:
|
|
server.stop(0)
|
|
println("[MCGhidra] Server stopped.")
|
|
|
|
|
|
# Run
|
|
main()
|