# GhydraMCPServer.py - Headless Ghidra script for GhydraMCP HTTP API # Full API parity with the Java plugin implementation. # Python 2 / Jython compatible (no f-strings, no readAllBytes). # # Usage: analyzeHeadless -import -postScript GhydraMCPServer.py [port] # #@category GhydraMCP #@keybinding #@menupath #@toolbar # === Java imports === from com.sun.net.httpserver import HttpServer, HttpHandler from java.net import InetSocketAddress, URLDecoder from java.util.concurrent import Executors from java.io import OutputStream, BufferedReader, InputStreamReader # === Ghidra imports === from ghidra.app.decompiler import DecompInterface from ghidra.program.model.symbol import SourceType from ghidra.program.model.listing import CodeUnit # === Python imports === import json import re import threading import time # === Constants === API_VERSION = 2 # Integer for MCP client compatibility (minimum expected: 2) API_VERSION_STRING = "2.1" DEFAULT_PORT = 8192 # === Thread-safe transaction lock === _tx_lock = threading.Lock() # ======================================================================== # Utility Functions # ======================================================================== def url_decode(s): """URL-decode a string using Java's URLDecoder.""" try: return URLDecoder.decode(s, "UTF-8") except: return s def parse_query_params(exchange): """Parse query string from an HttpExchange into a dict.""" params = {} query = exchange.getRequestURI().getQuery() if query: for part in query.split("&"): if "=" in part: k, v = part.split("=", 1) params[k] = url_decode(v) else: params[part] = "" return params def read_request_body(exchange): """Read the request body as a string (Jython-compatible, no readAllBytes).""" reader = BufferedReader(InputStreamReader(exchange.getRequestBody(), "UTF-8")) lines = [] line = reader.readLine() while line is not None: lines.append(line) line = reader.readLine() reader.close() return "\n".join(lines) def parse_json_body(exchange): """Read and parse JSON body from an HTTP request.""" body = read_request_body(exchange) if not body or not body.strip(): return {} try: return json.loads(body) except: return {} def parse_int(value, default): """Safely parse an integer with a default fallback.""" if value is None: return default try: return int(value) except (ValueError, TypeError): return default def bytes_to_hex(byte_array): """Convert a Java/Python byte array to a hex string.""" return ''.join(['%02x' % (b & 0xff) for b in byte_array]) def hex_to_bytes(hex_str): """Convert a hex string to a list of integer byte values.""" result = [] for i in range(0, len(hex_str), 2): result.append(int(hex_str[i:i+2], 16)) return result def make_link(href): """Create a HATEOAS link dict.""" return {"href": href} def compile_grep(params): """Compile a grep pattern from query params if present. Returns a compiled regex or None if no grep param. Uses re.IGNORECASE by default. """ grep = params.get("grep") if not grep: return None try: return re.compile(grep, re.IGNORECASE) except: return None def grep_matches_item(item, pattern): """Check if any string value in item matches the grep pattern. Searches all string values in dict items, or the string representation of non-dict items. """ if pattern is None: return True if isinstance(item, dict): for value in item.values(): if isinstance(value, (str,)): if pattern.search(value): return True elif isinstance(value, (int, float, bool)): if pattern.search(str(value)): return True return False return bool(pattern.search(str(item))) def with_transaction(program, desc, fn): """Execute fn inside a thread-safe Ghidra transaction.""" _tx_lock.acquire() try: tx_id = program.startTransaction(desc) try: result = fn() program.endTransaction(tx_id, True) return result except: program.endTransaction(tx_id, False) raise finally: _tx_lock.release() # ======================================================================== # Data Type Resolution # ======================================================================== _BUILTIN_TYPES = None def _init_builtin_types(): """Lazily initialize the builtin data type map.""" global _BUILTIN_TYPES if _BUILTIN_TYPES is not None: return _BUILTIN_TYPES _BUILTIN_TYPES = {} try: from ghidra.program.model.data import ( ByteDataType, WordDataType, DWordDataType, QWordDataType, FloatDataType, DoubleDataType, CharDataType, ShortDataType, IntegerDataType, LongDataType, LongLongDataType, PointerDataType, BooleanDataType, StringDataType, UnicodeDataType, Undefined1DataType, Undefined2DataType, Undefined4DataType, Undefined8DataType ) pairs = [ ("byte", ByteDataType), ("word", WordDataType), ("dword", DWordDataType), ("qword", QWordDataType), ("float", FloatDataType), ("double", DoubleDataType), ("char", CharDataType), ("short", ShortDataType), ("int", IntegerDataType), ("long", LongDataType), ("longlong", LongLongDataType), ("pointer", PointerDataType), ("bool", BooleanDataType), ("boolean", BooleanDataType), ("string", StringDataType), ("unicode", UnicodeDataType), ("undefined1", Undefined1DataType), ("undefined2", Undefined2DataType), ("undefined4", Undefined4DataType), ("undefined8", Undefined8DataType), ] for name, cls in pairs: try: _BUILTIN_TYPES[name] = cls.dataType except: pass except ImportError: pass # Also try unsigned variants try: from ghidra.program.model.data import ( UnsignedShortDataType, UnsignedIntegerDataType, UnsignedLongDataType, UnsignedLongLongDataType ) for name, cls in [ ("ushort", UnsignedShortDataType), ("uint", UnsignedIntegerDataType), ("ulong", UnsignedLongDataType), ("ulonglong", UnsignedLongLongDataType), ]: try: _BUILTIN_TYPES[name] = cls.dataType except: pass except ImportError: pass return _BUILTIN_TYPES def resolve_data_type(dtm, type_name): """Resolve a type name string to a Ghidra DataType object.""" if not type_name: return None # Direct path lookup dt = dtm.getDataType("/" + type_name) if dt is not None: return dt # Builtin types map builtins = _init_builtin_types() dt = builtins.get(type_name.lower()) if dt is not None: return dt # Search all data types (expensive fallback) it = dtm.getAllDataTypes() while it.hasNext(): candidate = it.next() if candidate.getName().lower() == type_name.lower(): return candidate return None # ======================================================================== # Comment type mapping # ======================================================================== COMMENT_TYPE_MAP = { "pre": CodeUnit.PRE_COMMENT, "post": CodeUnit.POST_COMMENT, "eol": CodeUnit.EOL_COMMENT, "plate": CodeUnit.PLATE_COMMENT, "repeatable": CodeUnit.REPEATABLE_COMMENT, } # ======================================================================== # Route Table # ======================================================================== ROUTES = [ # (HTTP method, path regex, handler method name) # Root / Meta ("GET", r"^/$", "handle_root"), ("GET", r"^/info$", "handle_info"), ("GET", r"^/program$", "handle_program"), # Headless stubs (cursor-dependent, not available headless) ("GET", r"^/address$", "handle_address_stub"), ("GET", r"^/function$", "handle_function_stub"), # Functions - by-name variants (must precede address patterns) ("GET", r"^/functions/by-name/([^/]+)/decompile$", "handle_decompile_by_name"), ("GET", r"^/functions/by-name/([^/]+)/disassembly$", "handle_disassembly_by_name"), ("PUT", r"^/functions/by-name/([^/]+)/signature$", "handle_signature_by_name"), ("PATCH", r"^/functions/by-name/([^/]+)$", "handle_patch_function_by_name"), ("GET", r"^/functions/by-name/([^/]+)$", "handle_function_by_name"), # Functions - by address ("GET", r"^/functions/([^/]+)/decompile$", "handle_decompile"), ("GET", r"^/functions/([^/]+)/disassembly$", "handle_disassembly"), ("GET", r"^/functions/([^/]+)/variables$", "handle_function_variables"), ("PUT", r"^/functions/([^/]+)/signature$", "handle_signature"), ("PATCH", r"^/functions/([^/]+)$", "handle_patch_function"), ("GET", r"^/functions/([^/]+)$", "handle_function_detail"), # Functions list / create ("GET", r"^/functions/?$", "handle_functions_list"), ("POST", r"^/functions/?$", "handle_create_function"), # Data ("POST", r"^/data/delete$", "handle_data_delete"), ("POST", r"^/data/type$", "handle_data_type"), ("GET", r"^/data/strings$", "handle_strings"), ("GET", r"^/data/?$", "handle_data_list"), ("POST", r"^/data/?$", "handle_data_create"), # Strings ("GET", r"^/strings$", "handle_strings"), # Memory ("GET", r"^/memory/([^/]+)/comments/([^/]+)$", "handle_get_comment"), ("POST", r"^/memory/([^/]+)/comments/([^/]+)$", "handle_set_comment"), ("GET", r"^/memory/blocks$", "handle_memory_blocks"), ("GET", r"^/memory$", "handle_memory_read"), ("PATCH", r"^/programs/current/memory/([^/]+)$", "handle_memory_write"), # Segments ("GET", r"^/segments$", "handle_segments"), # Symbols ("GET", r"^/symbols/imports$", "handle_imports"), ("GET", r"^/symbols/exports$", "handle_exports"), ("POST", r"^/symbols$", "handle_symbol_create"), ("PATCH", r"^/symbols/([^/]+)$", "handle_symbol_rename"), ("DELETE", r"^/symbols/([^/]+)$", "handle_symbol_delete"), ("GET", r"^/symbols$", "handle_symbols"), # Variables ("PATCH", r"^/functions/([^/]+)/variables/([^/]+)$", "handle_variable_rename"), ("GET", r"^/variables$", "handle_variables"), # Bookmarks ("POST", r"^/bookmarks$", "handle_bookmark_create"), ("DELETE", r"^/bookmarks/([^/]+)$", "handle_bookmark_delete"), ("GET", r"^/bookmarks$", "handle_bookmarks"), # Data types ("POST", r"^/datatypes/enums$", "handle_enum_create"), ("GET", r"^/datatypes/enums$", "handle_enums"), ("POST", r"^/datatypes/typedefs$", "handle_typedef_create"), ("GET", r"^/datatypes/typedefs$", "handle_typedefs"), # Cross-references ("GET", r"^/xrefs$", "handle_xrefs"), # Classes & Namespaces ("GET", r"^/classes$", "handle_classes"), ("GET", r"^/namespaces$", "handle_namespaces"), # Structs ("POST", r"^/structs/create$", "handle_struct_create"), ("POST", r"^/structs/addfield$", "handle_struct_addfield"), ("POST", r"^/structs/updatefield$", "handle_struct_updatefield"), ("POST", r"^/structs/delete$", "handle_struct_delete"), ("GET", r"^/structs$", "handle_structs"), # Analysis ("GET", r"^/analysis/callgraph$", "handle_callgraph"), ("GET", r"^/analysis/dataflow$", "handle_dataflow"), ("GET", r"^/analysis$", "handle_analysis_info"), ("POST", r"^/analysis$", "handle_analysis_run"), # Legacy compatibility ("GET", r"^/decompile$", "handle_decompile_legacy"), ] # ======================================================================== # HTTP Handler # ======================================================================== class GhydraMCPHandler(HttpHandler): def __init__(self, program, decompiler): self.program = program self.decompiler = decompiler # Pre-compile route patterns self.routes = [] for method, pattern, handler_name in ROUTES: self.routes.append((method, re.compile(pattern), handler_name)) # ------------------------------------------------------------------ # Dispatch # ------------------------------------------------------------------ def handle(self, exchange): try: method = exchange.getRequestMethod() path = exchange.getRequestURI().getPath() # CORS preflight if method == "OPTIONS": self._send_cors_preflight(exchange) return # Match routes for route_method, pattern, handler_name in self.routes: if method != route_method: continue match = pattern.match(path) if match: handler = getattr(self, handler_name) groups = match.groups() decoded = tuple(url_decode(g) for g in groups) result = handler(exchange, *decoded) if isinstance(result, tuple): response, code = result self._send_response(exchange, code, response) else: self._send_response(exchange, 200, result) return # No route matched self._send_response(exchange, 404, { "success": False, "error": {"code": "NOT_FOUND", "message": "Endpoint not found: %s %s" % (method, path)} }) except Exception as e: try: self._send_response(exchange, 500, { "success": False, "error": {"code": "INTERNAL_ERROR", "message": str(e)} }) except: pass def _send_response(self, exchange, code, data): response_bytes = json.dumps(data, indent=2).encode('utf-8') headers = exchange.getResponseHeaders() headers.set("Content-Type", "application/json; charset=utf-8") headers.set("Access-Control-Allow-Origin", "*") headers.set("Access-Control-Allow-Methods", "GET, POST, PATCH, PUT, DELETE, OPTIONS") headers.set("Access-Control-Allow-Headers", "Content-Type, X-Request-ID") exchange.sendResponseHeaders(code, len(response_bytes)) os = exchange.getResponseBody() os.write(response_bytes) os.close() def _send_cors_preflight(self, exchange): headers = exchange.getResponseHeaders() headers.set("Access-Control-Allow-Origin", "*") headers.set("Access-Control-Allow-Methods", "GET, POST, PATCH, PUT, DELETE, OPTIONS") headers.set("Access-Control-Allow-Headers", "Content-Type, X-Request-ID") headers.set("Access-Control-Max-Age", "86400") exchange.sendResponseHeaders(204, -1) exchange.getResponseBody().close() # ------------------------------------------------------------------ # Internal helpers # ------------------------------------------------------------------ def _no_program(self): return {"success": False, "error": {"code": "NO_PROGRAM_LOADED", "message": "No program loaded"}} def _find_function_at(self, addr_str): """Find a function by address string. Falls back to containing function.""" try: addr = self.program.getAddressFactory().getAddress(addr_str) fm = self.program.getFunctionManager() func = fm.getFunctionAt(addr) if func is None: func = fm.getFunctionContaining(addr) return func except: return None def _find_function_by_name(self, name): """Find a function by exact name match.""" fm = self.program.getFunctionManager() for func in fm.getFunctions(True): if func.getName() == name: return func return None def _get_function(self, identifier, by_name=False): """Resolve a function by name or address.""" if by_name: return self._find_function_by_name(identifier) return self._find_function_at(identifier) def _decompile_function(self, func): """Decompile a function and return the result dict.""" result = self.decompiler.decompileFunction(func, 60, getMonitor()) resp = { "name": func.getName(), "address": str(func.getEntryPoint()), } if result and result.decompileCompleted(): decomp = result.getDecompiledFunction() code = decomp.getC() resp["decompiled"] = code resp["decompiled_text"] = code # alias for MCP client compat resp["ccode"] = code # alias for API spec compat resp["signature"] = decomp.getSignature() else: msg = result.getErrorMessage() if result else "Unknown error" resp["error"] = "Decompilation failed: %s" % msg addr = str(func.getEntryPoint()) resp["_links"] = { "self": make_link("/functions/%s/decompile" % addr), "function": make_link("/functions/%s" % addr), "disassembly": make_link("/functions/%s/disassembly" % addr), } return {"success": True, "result": resp} def _build_disassembly(self, func): """Get disassembly instructions for a function.""" instructions = [] listing = self.program.getListing() body = func.getBody() inst_iter = listing.getInstructions(body, True) while inst_iter.hasNext(): instr = inst_iter.next() full_repr = str(instr) mnemonic = instr.getMnemonicString() operands = full_repr[len(mnemonic):].strip() if len(full_repr) > len(mnemonic) else "" instructions.append({ "address": str(instr.getAddress()), "mnemonic": mnemonic, "operands": operands, "bytes": bytes_to_hex(instr.getBytes()), }) addr = str(func.getEntryPoint()) # Build text representation for MCP client lines = [] for inst in instructions: lines.append("%s %s %s" % (inst["address"], inst["mnemonic"], inst["operands"])) disassembly_text = "\n".join(lines) return { "success": True, "result": { "name": func.getName(), "address": addr, "instructions": instructions, "instructionCount": len(instructions), "disassembly_text": disassembly_text, "_links": { "self": make_link("/functions/%s/disassembly" % addr), "function": make_link("/functions/%s" % addr), "decompile": make_link("/functions/%s/decompile" % addr), }, } } def _build_function_info(self, func): """Build a detailed function info dict.""" addr = str(func.getEntryPoint()) info = { "name": func.getName(), "address": addr, "signature": str(func.getSignature()), "returnType": func.getReturnType().getName(), "parameterCount": func.getParameterCount(), "callingConvention": func.getCallingConventionName(), "isThunk": func.isThunk(), "isExternal": func.isExternal(), "bodySize": func.getBody().getNumAddresses(), } # Parameters params = [] for param in func.getParameters(): params.append({ "name": param.getName(), "type": param.getDataType().getName(), "ordinal": param.getOrdinal(), }) info["parameters"] = params # Comments entry = func.getEntryPoint() listing = self.program.getListing() cu = listing.getCodeUnitAt(entry) if cu: for ctype, cname in [ (CodeUnit.PRE_COMMENT, "preComment"), (CodeUnit.POST_COMMENT, "postComment"), (CodeUnit.EOL_COMMENT, "eolComment"), (CodeUnit.PLATE_COMMENT, "plateComment"), ]: c = cu.getComment(ctype) if c: info[cname] = c func_comment = func.getComment() if func_comment: info["comment"] = func_comment # HATEOAS links info["_links"] = { "self": make_link("/functions/%s" % addr), "decompile": make_link("/functions/%s/decompile" % addr), "disassembly": make_link("/functions/%s/disassembly" % addr), "variables": make_link("/functions/%s/variables" % addr), "xrefs_to": make_link("/xrefs?to_addr=%s" % addr), "xrefs_from": make_link("/xrefs?from_addr=%s" % addr), } return info def _build_xref_info(self, ref): """Build a cross-reference info dict.""" info = { "fromAddress": str(ref.getFromAddress()), "toAddress": str(ref.getToAddress()), "type": self._get_ref_type_name(ref.getReferenceType()), "isPrimary": ref.isPrimary(), } fm = self.program.getFunctionManager() from_func = fm.getFunctionContaining(ref.getFromAddress()) to_func = fm.getFunctionContaining(ref.getToAddress()) if from_func: info["fromFunction"] = from_func.getName() if to_func: info["toFunction"] = to_func.getName() return info def _get_ref_type_name(self, ref_type): if ref_type.isCall(): return "CALL" if ref_type.isData(): return "DATA" if ref_type.isRead(): return "READ" if ref_type.isWrite(): return "WRITE" if ref_type.isJump(): return "JUMP" return str(ref_type) def _find_struct(self, name): """Find a struct/union data type by name.""" dtm = self.program.getDataTypeManager() it = dtm.getAllDataTypes() while it.hasNext(): dt = it.next() if dt.getName() == name: from ghidra.program.model.data import Structure, Union if isinstance(dt, (Structure, Union)): return dt return None # ================================================================== # Root / Meta Handlers # ================================================================== def handle_root(self, exchange): result = { "success": True, "api_version": API_VERSION, "api_version_string": API_VERSION_STRING, "message": "GhydraMCP Headless API", "mode": "headless", } if self.program: result["program"] = self.program.getName() result["file"] = self.program.getExecutablePath() result["language"] = str(self.program.getLanguageID()) result["_links"] = { "self": make_link("/"), "info": make_link("/info"), "program": make_link("/program"), "functions": make_link("/functions"), "data": make_link("/data"), "strings": make_link("/strings"), "memory": make_link("/memory"), "segments": make_link("/segments"), "symbols": make_link("/symbols"), "xrefs": make_link("/xrefs"), "classes": make_link("/classes"), "namespaces": make_link("/namespaces"), "structs": make_link("/structs"), "analysis": make_link("/analysis"), } return result def handle_info(self, exchange): if not self.program: return self._no_program() mem = self.program.getMemory() result = { "success": True, "result": { "name": self.program.getName(), "path": self.program.getExecutablePath(), "language": str(self.program.getLanguageID()), "processor": str(self.program.getLanguage().getProcessor()), "addressSize": self.program.getAddressFactory().getDefaultAddressSpace().getSize(), "imageBase": str(self.program.getImageBase()), "minAddress": str(mem.getMinAddress()), "maxAddress": str(mem.getMaxAddress()), "memorySize": mem.getSize(), "functionCount": self.program.getFunctionManager().getFunctionCount(), "mode": "headless", "_links": { "self": make_link("/info"), "program": make_link("/program"), "functions": make_link("/functions"), }, } } return result def handle_program(self, exchange): if not self.program: return self._no_program() mem = self.program.getMemory() fm = self.program.getFunctionManager() result = { "name": self.program.getName(), "path": self.program.getExecutablePath(), "language": str(self.program.getLanguageID()), "compiler": str(self.program.getCompilerSpec().getCompilerSpecID()), "processor": str(self.program.getLanguage().getProcessor()), "addressSize": self.program.getAddressFactory().getDefaultAddressSpace().getSize(), "imageBase": str(self.program.getImageBase()), "minAddress": str(mem.getMinAddress()), "maxAddress": str(mem.getMaxAddress()), "memorySize": mem.getSize(), "functionCount": fm.getFunctionCount(), "_links": { "self": make_link("/program"), "functions": make_link("/functions"), "symbols": make_link("/symbols"), "segments": make_link("/segments"), "analysis": make_link("/analysis"), }, } return {"success": True, "result": result} # ================================================================== # Headless Stubs # ================================================================== def handle_address_stub(self, exchange): return { "success": False, "error": { "code": "HEADLESS_MODE", "message": "Current address is not available in headless mode. Use /functions or /data with address parameters instead." } } def handle_function_stub(self, exchange): return { "success": False, "error": { "code": "HEADLESS_MODE", "message": "Current function is not available in headless mode. Use /functions/{address} instead." } } # ================================================================== # Function Handlers # ================================================================== def handle_functions_list(self, exchange): if not self.program: return self._no_program() params = parse_query_params(exchange) limit = parse_int(params.get("limit"), 100) offset = parse_int(params.get("offset"), 0) name_filter = params.get("name") name_contains = params.get("name_contains") name_regex = params.get("name_matches_regex") addr_filter = params.get("addr") grep_pattern = compile_grep(params) # Compile name regex if provided name_regex_pat = None if name_regex: try: name_regex_pat = re.compile(name_regex, re.IGNORECASE) except: return {"success": False, "error": {"code": "INVALID_REGEX", "message": "Invalid regex: %s" % name_regex}} fm = self.program.getFunctionManager() total = fm.getFunctionCount() # Single function lookup by address if addr_filter: func = self._find_function_at(addr_filter) if not func: return {"success": True, "result": [], "size": 0, "offset": 0, "limit": limit} addr = str(func.getEntryPoint()) return {"success": True, "result": [{ "name": func.getName(), "address": addr, "signature": str(func.getSignature()), "parameterCount": func.getParameterCount(), "isThunk": func.isThunk(), "_links": { "self": make_link("/functions/%s" % addr), "decompile": make_link("/functions/%s/decompile" % addr), "disassembly": make_link("/functions/%s/disassembly" % addr), }, }], "size": 1, "offset": 0, "limit": limit} functions = [] count = 0 skipped = 0 for func in fm.getFunctions(True): if count >= limit: break func_name = func.getName() # Apply name filters if name_filter and name_filter.lower() not in func_name.lower(): continue if name_contains and name_contains.lower() not in func_name.lower(): continue if name_regex_pat and not name_regex_pat.search(func_name): continue if skipped < offset: skipped += 1 continue addr = str(func.getEntryPoint()) item = { "name": func_name, "address": addr, "signature": str(func.getSignature()), "parameterCount": func.getParameterCount(), "isThunk": func.isThunk(), "_links": { "self": make_link("/functions/%s" % addr), "decompile": make_link("/functions/%s/decompile" % addr), "disassembly": make_link("/functions/%s/disassembly" % addr), }, } if not grep_matches_item(item, grep_pattern): continue functions.append(item) count += 1 result = { "success": True, "result": functions, "size": total, "offset": offset, "limit": limit, "_links": { "self": make_link("/functions?offset=%d&limit=%d" % (offset, limit)), }, } if offset + count < total: result["_links"]["next"] = make_link("/functions?offset=%d&limit=%d" % (offset + limit, limit)) if offset > 0: result["_links"]["prev"] = make_link("/functions?offset=%d&limit=%d" % (max(0, offset - limit), limit)) return result def handle_function_detail(self, exchange, addr_str): if not self.program: return self._no_program() func = self._find_function_at(addr_str) if not func: return {"success": False, "error": {"code": "FUNCTION_NOT_FOUND", "message": "No function at address: %s" % addr_str}} return {"success": True, "result": self._build_function_info(func)} def handle_function_by_name(self, exchange, name): if not self.program: return self._no_program() func = self._find_function_by_name(name) if not func: return {"success": False, "error": {"code": "FUNCTION_NOT_FOUND", "message": "Function not found: %s" % name}} return {"success": True, "result": self._build_function_info(func)} # -- Decompile -- def handle_decompile(self, exchange, addr_str): if not self.program: return self._no_program() func = self._find_function_at(addr_str) if not func: return {"success": False, "error": {"code": "FUNCTION_NOT_FOUND", "message": "No function at address: %s" % addr_str}} return self._decompile_function(func) def handle_decompile_by_name(self, exchange, name): if not self.program: return self._no_program() func = self._find_function_by_name(name) if not func: return {"success": False, "error": {"code": "FUNCTION_NOT_FOUND", "message": "Function not found: %s" % name}} return self._decompile_function(func) # -- Disassembly -- def handle_disassembly(self, exchange, addr_str): if not self.program: return self._no_program() func = self._find_function_at(addr_str) if not func: return {"success": False, "error": {"code": "FUNCTION_NOT_FOUND", "message": "No function at address: %s" % addr_str}} return self._build_disassembly(func) def handle_disassembly_by_name(self, exchange, name): if not self.program: return self._no_program() func = self._find_function_by_name(name) if not func: return {"success": False, "error": {"code": "FUNCTION_NOT_FOUND", "message": "Function not found: %s" % name}} return self._build_disassembly(func) # -- Variables -- def handle_function_variables(self, exchange, addr_str): if not self.program: return self._no_program() func = self._find_function_at(addr_str) if not func: return {"success": False, "error": {"code": "FUNCTION_NOT_FOUND", "message": "No function at address: %s" % addr_str}} variables = [] # Parameters for param in func.getParameters(): variables.append({ "name": param.getName(), "type": param.getDataType().getName(), "storage": str(param.getVariableStorage()), "kind": "parameter", "ordinal": param.getOrdinal(), }) # Local variables for local in func.getLocalVariables(): var_info = { "name": local.getName(), "type": local.getDataType().getName(), "storage": str(local.getVariableStorage()), "kind": "local", } try: so = local.getStackOffset() if so != 0: var_info["stackOffset"] = so except: pass variables.append(var_info) addr = str(func.getEntryPoint()) return { "success": True, "result": { "name": func.getName(), "address": addr, "variables": variables, "parameterCount": func.getParameterCount(), "localVariableCount": len(func.getLocalVariables()), } } # -- Patch (rename / comment) -- def _handle_patch_function_impl(self, exchange, identifier, by_name): if not self.program: return self._no_program() func = self._get_function(identifier, by_name) if not func: return {"success": False, "error": {"code": "FUNCTION_NOT_FOUND", "message": "Function not found: %s" % identifier}} body = parse_json_body(exchange) new_name = body.get("name") comment = body.get("comment") if not new_name and comment is None: return {"success": False, "error": {"code": "MISSING_PARAMETER", "message": "Provide 'name' and/or 'comment' in request body"}} old_name = func.getName() addr = str(func.getEntryPoint()) result = {"address": addr, "oldName": old_name} def do_patch(): if new_name: func.setName(new_name, SourceType.USER_DEFINED) result["newName"] = new_name if comment is not None: func.setComment(comment) result["comment"] = comment with_transaction(self.program, "Patch function", do_patch) result["message"] = "Function updated successfully" return {"success": True, "result": result} def handle_patch_function(self, exchange, addr_str): return self._handle_patch_function_impl(exchange, addr_str, by_name=False) def handle_patch_function_by_name(self, exchange, name): return self._handle_patch_function_impl(exchange, name, by_name=True) # -- Create function -- def handle_create_function(self, exchange): if not self.program: return self._no_program() body = parse_json_body(exchange) addr_str = body.get("address") if not addr_str: return {"success": False, "error": {"code": "MISSING_PARAMETER", "message": "Missing 'address' in request body"}} try: addr = self.program.getAddressFactory().getAddress(addr_str) except: return {"success": False, "error": {"code": "INVALID_ADDRESS", "message": "Invalid address: %s" % addr_str}} result_holder = [None] def do_create(): fm = self.program.getFunctionManager() func = fm.createFunction(None, addr, None, SourceType.USER_DEFINED) result_holder[0] = func try: with_transaction(self.program, "Create function", do_create) func = result_holder[0] if func: return ({"success": True, "result": { "name": func.getName(), "address": str(func.getEntryPoint()), "message": "Function created successfully", }}, 201) return {"success": False, "error": {"code": "CREATE_FAILED", "message": "Failed to create function at %s" % addr_str}} except Exception as e: return {"success": False, "error": {"code": "CREATE_ERROR", "message": str(e)}} # -- Signature -- def _handle_signature_impl(self, exchange, identifier, by_name): if not self.program: return self._no_program() func = self._get_function(identifier, by_name) if not func: return {"success": False, "error": {"code": "FUNCTION_NOT_FOUND", "message": "Function not found: %s" % identifier}} body = parse_json_body(exchange) sig_str = body.get("signature") if not sig_str: return {"success": False, "error": {"code": "MISSING_PARAMETER", "message": "Missing 'signature' in request body"}} addr = str(func.getEntryPoint()) try: from ghidra.app.util.cparser.C import CParser dtm = self.program.getDataTypeManager() parser = CParser(dtm) parsed = parser.parse(sig_str + ";") def do_set_sig(): from ghidra.program.model.data import FunctionDefinitionDataType if isinstance(parsed, FunctionDefinitionDataType): from ghidra.app.cmd.function import ApplyFunctionSignatureCmd cmd = ApplyFunctionSignatureCmd(func.getEntryPoint(), parsed, SourceType.USER_DEFINED) cmd.applyTo(self.program, getMonitor()) with_transaction(self.program, "Set function signature", do_set_sig) return {"success": True, "result": { "address": addr, "signature": sig_str, "message": "Signature updated", }} except Exception as e: # Fallback: try just setting name from signature return {"success": False, "error": {"code": "SIGNATURE_ERROR", "message": "Could not parse signature: %s" % str(e)}} def handle_signature(self, exchange, addr_str): return self._handle_signature_impl(exchange, addr_str, by_name=False) def handle_signature_by_name(self, exchange, name): return self._handle_signature_impl(exchange, name, by_name=True) # ================================================================== # Data Handlers # ================================================================== def handle_data_list(self, exchange): if not self.program: return self._no_program() params = parse_query_params(exchange) limit = parse_int(params.get("limit"), 100) offset = parse_int(params.get("offset"), 0) addr_filter = params.get("addr") name_filter = params.get("name") name_contains = params.get("name_contains") type_filter = params.get("type") grep_pattern = compile_grep(params) # Single address lookup if addr_filter: try: addr = self.program.getAddressFactory().getAddress(addr_filter) data = self.program.getListing().getDataAt(addr) if data: value = data.getDefaultValueRepresentation() if value and len(value) > 200: value = value[:200] + "..." item = { "address": str(data.getAddress()), "type": data.getDataType().getName(), "length": data.getLength(), "value": value, } sym = self.program.getSymbolTable().getPrimarySymbol(addr) if sym: item["name"] = sym.getName() return {"success": True, "result": [item], "size": 1, "offset": 0, "limit": 1} return {"success": True, "result": [], "size": 0, "offset": 0, "limit": limit} except: return {"success": True, "result": [], "size": 0, "offset": 0, "limit": limit} data_items = [] listing = self.program.getListing() st = self.program.getSymbolTable() count = 0 skipped = 0 for data in listing.getDefinedData(True): if count >= limit: break # Type filter if type_filter and type_filter.lower() not in data.getDataType().getName().lower(): continue # Name filters (require symbol lookup) if name_filter or name_contains: sym = st.getPrimarySymbol(data.getAddress()) sym_name = sym.getName() if sym else "" if name_filter and sym_name != name_filter: continue if name_contains and name_contains.lower() not in sym_name.lower(): continue if skipped < offset: skipped += 1 continue value = data.getDefaultValueRepresentation() if value and len(value) > 200: value = value[:200] + "..." item = { "address": str(data.getAddress()), "type": data.getDataType().getName(), "length": data.getLength(), "value": value, } sym = st.getPrimarySymbol(data.getAddress()) if sym: item["name"] = sym.getName() item["_links"] = {"self": make_link("/data/%s" % str(data.getAddress()))} if not grep_matches_item(item, grep_pattern): continue data_items.append(item) count += 1 return { "success": True, "result": data_items, "offset": offset, "limit": limit, } def handle_data_create(self, exchange): if not self.program: return self._no_program() body = parse_json_body(exchange) addr_str = body.get("address") if not addr_str: return {"success": False, "error": {"code": "MISSING_PARAMETER", "message": "Missing 'address'"}} try: addr = self.program.getAddressFactory().getAddress(addr_str) except: return {"success": False, "error": {"code": "INVALID_ADDRESS", "message": "Invalid address: %s" % addr_str}} # Label creation (newName field) new_name = body.get("newName") if new_name: def do_label(): self.program.getSymbolTable().createLabel(addr, new_name, SourceType.USER_DEFINED) try: with_transaction(self.program, "Create label", do_label) return {"success": True, "result": {"address": addr_str, "name": new_name, "message": "Label created"}} except Exception as e: return {"success": False, "error": {"code": "LABEL_ERROR", "message": str(e)}} # Data creation (type field) type_name = body.get("type") if not type_name: return {"success": False, "error": {"code": "MISSING_PARAMETER", "message": "Missing 'type' or 'newName'"}} dtm = self.program.getDataTypeManager() dt = resolve_data_type(dtm, type_name) if not dt: return {"success": False, "error": {"code": "UNKNOWN_TYPE", "message": "Cannot resolve type: %s" % type_name}} def do_create_data(): from ghidra.program.model.data import DataUtilities DataUtilities.createData( self.program, addr, dt, -1, False, DataUtilities.ClearDataMode.CLEAR_ALL_UNDEFINED_CONFLICT_DATA ) try: with_transaction(self.program, "Create data", do_create_data) return ({"success": True, "result": {"address": addr_str, "type": type_name, "message": "Data created"}}, 201) except Exception as e: return {"success": False, "error": {"code": "DATA_ERROR", "message": str(e)}} def handle_data_delete(self, exchange): if not self.program: return self._no_program() body = parse_json_body(exchange) addr_str = body.get("address") if not addr_str: return {"success": False, "error": {"code": "MISSING_PARAMETER", "message": "Missing 'address'"}} try: addr = self.program.getAddressFactory().getAddress(addr_str) except: return {"success": False, "error": {"code": "INVALID_ADDRESS", "message": "Invalid address: %s" % addr_str}} def do_delete(): self.program.getListing().clearCodeUnits(addr, addr, False) try: with_transaction(self.program, "Delete data", do_delete) return {"success": True, "result": {"address": addr_str, "message": "Data deleted"}} except Exception as e: return {"success": False, "error": {"code": "DELETE_ERROR", "message": str(e)}} def handle_data_type(self, exchange): """Change data type at an address (clear + recreate).""" if not self.program: return self._no_program() body = parse_json_body(exchange) addr_str = body.get("address") type_name = body.get("type") if not addr_str or not type_name: return {"success": False, "error": {"code": "MISSING_PARAMETER", "message": "Missing 'address' and/or 'type'"}} try: addr = self.program.getAddressFactory().getAddress(addr_str) except: return {"success": False, "error": {"code": "INVALID_ADDRESS", "message": "Invalid address: %s" % addr_str}} dtm = self.program.getDataTypeManager() dt = resolve_data_type(dtm, type_name) if not dt: return {"success": False, "error": {"code": "UNKNOWN_TYPE", "message": "Cannot resolve type: %s" % type_name}} def do_retype(): from ghidra.program.model.data import DataUtilities self.program.getListing().clearCodeUnits(addr, addr, False) DataUtilities.createData( self.program, addr, dt, -1, False, DataUtilities.ClearDataMode.CLEAR_ALL_UNDEFINED_CONFLICT_DATA ) try: with_transaction(self.program, "Change data type", do_retype) return {"success": True, "result": {"address": addr_str, "type": type_name, "message": "Data type changed"}} except Exception as e: return {"success": False, "error": {"code": "TYPE_ERROR", "message": str(e)}} # ================================================================== # Strings Handler # ================================================================== def handle_strings(self, exchange): if not self.program: return self._no_program() params = parse_query_params(exchange) limit = parse_int(params.get("limit"), 2000) offset = parse_int(params.get("offset"), 0) filter_str = params.get("filter") min_length = parse_int(params.get("min_length"), 2) grep_pattern = compile_grep(params) strings = [] listing = self.program.getListing() ref_mgr = self.program.getReferenceManager() count = 0 skipped = 0 for data in listing.getDefinedData(True): if count >= limit: break try: dt = data.getDataType() if not dt: continue type_name = dt.getName().lower() if "string" not in type_name and type_name not in ("char", "wchar"): continue value = data.getValue() if not value: continue str_val = str(value) if len(str_val) < min_length: continue if filter_str and filter_str.lower() not in str_val.lower(): continue if skipped < offset: skipped += 1 continue item = { "address": str(data.getAddress()), "value": str_val[:200], "length": len(str_val), "type": dt.getName(), } # Xref count ref_count = 0 refs = ref_mgr.getReferencesTo(data.getAddress()) while refs.hasNext(): refs.next() ref_count += 1 item["xrefCount"] = ref_count # Label name sym = self.program.getSymbolTable().getPrimarySymbol(data.getAddress()) if sym: item["name"] = sym.getName() if not grep_matches_item(item, grep_pattern): continue strings.append(item) count += 1 except: pass return { "success": True, "result": strings, "size": len(strings), "offset": offset, "limit": limit, } # ================================================================== # Memory Handlers # ================================================================== def handle_memory_read(self, exchange): if not self.program: return self._no_program() params = parse_query_params(exchange) addr_str = params.get("address") if not addr_str: return {"success": False, "error": {"code": "MISSING_PARAMETER", "message": "Missing 'address' parameter"}} length = parse_int(params.get("length"), 16) fmt = params.get("format", "hex") if length <= 0 or length > 4096: return {"success": False, "error": {"code": "INVALID_PARAMETER", "message": "Length must be 1-4096"}} try: addr = self.program.getAddressFactory().getAddress(addr_str) mem = self.program.getMemory() from jarray import zeros byte_array = zeros(length, 'b') bytes_read = mem.getBytes(addr, byte_array) if fmt == "base64": from java.util import Base64 # Convert to unsigned bytes for Base64 import array as pyarray unsigned = bytearray([(b & 0xff) for b in byte_array[:bytes_read]]) formatted = Base64.getEncoder().encodeToString(unsigned) elif fmt == "string": chars = [] for i in range(bytes_read): b = byte_array[i] & 0xff if 32 <= b < 127: chars.append(chr(b)) else: chars.append('.') formatted = ''.join(chars) else: formatted = bytes_to_hex(byte_array[:bytes_read]) return { "success": True, "result": { "address": str(addr), "bytesRead": bytes_read, "format": fmt, "bytes": formatted, "hexBytes": bytes_to_hex(byte_array[:bytes_read]), "rawBytes": bytes_to_hex(byte_array[:bytes_read]), } } except Exception as e: return {"success": False, "error": {"code": "MEMORY_ERROR", "message": str(e)}} def handle_memory_write(self, exchange, addr_str): if not self.program: return self._no_program() body = parse_json_body(exchange) bytes_str = body.get("bytes") fmt = body.get("format", "hex") if not bytes_str: return {"success": False, "error": {"code": "MISSING_PARAMETER", "message": "Missing 'bytes'"}} try: addr = self.program.getAddressFactory().getAddress(addr_str) if fmt == "base64": from java.util import Base64 byte_vals = list(Base64.getDecoder().decode(bytes_str)) elif fmt == "string": byte_vals = [ord(c) for c in bytes_str] else: byte_vals = hex_to_bytes(bytes_str) # Convert to Java byte array from jarray import zeros java_bytes = zeros(len(byte_vals), 'b') for i, b in enumerate(byte_vals): java_bytes[i] = b if b <= 127 else b - 256 def do_write(): self.program.getMemory().setBytes(addr, java_bytes) with_transaction(self.program, "Write memory", do_write) return {"success": True, "result": {"address": str(addr), "bytesWritten": len(byte_vals)}} except Exception as e: return {"success": False, "error": {"code": "MEMORY_ERROR", "message": str(e)}} def handle_memory_blocks(self, exchange): """List memory blocks (alias for /segments).""" return self.handle_segments(exchange) def handle_get_comment(self, exchange, addr_str, comment_type): """Get a comment at a specific address.""" if not self.program: return self._no_program() ct = COMMENT_TYPE_MAP.get(comment_type.lower()) if ct is None: return {"success": False, "error": { "code": "INVALID_COMMENT_TYPE", "message": "Invalid comment type: %s. Use: pre, post, eol, plate, repeatable" % comment_type }} try: addr = self.program.getAddressFactory().getAddress(addr_str) listing = self.program.getListing() cu = listing.getCodeUnitAt(addr) if not cu: cu = listing.getCodeUnitContaining(addr) if not cu: return {"success": True, "result": {"address": addr_str, "commentType": comment_type, "comment": None}} comment = cu.getComment(ct) return {"success": True, "result": { "address": addr_str, "commentType": comment_type, "comment": comment, }} except Exception as e: return {"success": False, "error": {"code": "COMMENT_ERROR", "message": str(e)}} def handle_set_comment(self, exchange, addr_str, comment_type): """Set a comment at a specific address.""" if not self.program: return self._no_program() body = parse_json_body(exchange) comment_text = body.get("comment", "") ct = COMMENT_TYPE_MAP.get(comment_type.lower()) if ct is None: return {"success": False, "error": { "code": "INVALID_COMMENT_TYPE", "message": "Invalid comment type: %s. Use: pre, post, eol, plate, repeatable" % comment_type }} try: addr = self.program.getAddressFactory().getAddress(addr_str) listing = self.program.getListing() cu = listing.getCodeUnitAt(addr) if not cu: cu = listing.getCodeUnitContaining(addr) if not cu: return {"success": False, "error": {"code": "NO_CODE_UNIT", "message": "No code unit at address: %s" % addr_str}} def do_comment(): cu.setComment(ct, comment_text if comment_text else None) with_transaction(self.program, "Set comment", do_comment) return {"success": True, "result": { "address": addr_str, "commentType": comment_type, "comment": comment_text, "message": "Comment set successfully", }} except Exception as e: return {"success": False, "error": {"code": "COMMENT_ERROR", "message": str(e)}} # ================================================================== # Segments Handler # ================================================================== def handle_segments(self, exchange): if not self.program: return self._no_program() params = parse_query_params(exchange) name_filter = params.get("name") segments = [] for block in self.program.getMemory().getBlocks(): if name_filter and name_filter not in block.getName(): continue segments.append({ "name": block.getName(), "start": str(block.getStart()), "end": str(block.getEnd()), "size": block.getSize(), "readable": block.isRead(), "writable": block.isWrite(), "executable": block.isExecute(), "initialized": block.isInitialized(), "_links": { "self": make_link("/segments/%s" % block.getName()), "memory": make_link("/memory?address=%s&length=1024" % str(block.getStart())), }, }) return {"success": True, "result": segments} # ================================================================== # Symbol Handlers # ================================================================== def handle_symbols(self, exchange): if not self.program: return self._no_program() params = parse_query_params(exchange) limit = parse_int(params.get("limit"), 100) offset = parse_int(params.get("offset"), 0) name_filter = params.get("name") type_filter = params.get("type") grep_pattern = compile_grep(params) symbols = [] st = self.program.getSymbolTable() count = 0 skipped = 0 for symbol in st.getAllSymbols(True): if count >= limit: break if name_filter and name_filter.lower() not in symbol.getName().lower(): continue if type_filter and str(symbol.getSymbolType()).lower() != type_filter.lower(): continue if skipped < offset: skipped += 1 continue item = { "name": symbol.getName(), "address": str(symbol.getAddress()), "namespace": symbol.getParentNamespace().getName(), "type": str(symbol.getSymbolType()), "isPrimary": symbol.isPrimary(), "isExternal": symbol.isExternal(), } if not grep_matches_item(item, grep_pattern): continue symbols.append(item) count += 1 return { "success": True, "result": symbols, "offset": offset, "limit": limit, "_links": { "self": make_link("/symbols"), "imports": make_link("/symbols/imports"), "exports": make_link("/symbols/exports"), }, } def handle_imports(self, exchange): if not self.program: return self._no_program() params = parse_query_params(exchange) limit = parse_int(params.get("limit"), 100) offset = parse_int(params.get("offset"), 0) grep_pattern = compile_grep(params) imports = [] count = 0 skipped = 0 for symbol in self.program.getSymbolTable().getExternalSymbols(): if count >= limit: break if skipped < offset: skipped += 1 continue item = { "name": symbol.getName(), "address": str(symbol.getAddress()), "namespace": symbol.getParentNamespace().getName(), } if not grep_matches_item(item, grep_pattern): continue imports.append(item) count += 1 return {"success": True, "result": imports, "offset": offset, "limit": limit} def handle_exports(self, exchange): if not self.program: return self._no_program() params = parse_query_params(exchange) limit = parse_int(params.get("limit"), 100) offset = parse_int(params.get("offset"), 0) grep_pattern = compile_grep(params) exports = [] count = 0 skipped = 0 for symbol in self.program.getSymbolTable().getAllSymbols(True): if count >= limit: break if not symbol.isExternalEntryPoint(): continue if skipped < offset: skipped += 1 continue item = { "name": symbol.getName(), "address": str(symbol.getAddress()), } if not grep_matches_item(item, grep_pattern): continue exports.append(item) count += 1 return {"success": True, "result": exports, "offset": offset, "limit": limit} # ================================================================== # Cross-References Handler # ================================================================== def handle_xrefs(self, exchange): if not self.program: return self._no_program() params = parse_query_params(exchange) to_addr_str = params.get("to_addr") from_addr_str = params.get("from_addr") type_filter = params.get("type") limit = parse_int(params.get("limit"), 100) offset = parse_int(params.get("offset"), 0) grep_pattern = compile_grep(params) if not to_addr_str and not from_addr_str: return {"success": False, "error": { "code": "MISSING_PARAMETER", "message": "Either 'to_addr' or 'from_addr' parameter is required" }} xrefs = [] ref_mgr = self.program.getReferenceManager() try: if to_addr_str: addr = self.program.getAddressFactory().getAddress(to_addr_str) refs = ref_mgr.getReferencesTo(addr) count = 0 skipped = 0 while refs.hasNext() and count < limit: ref = refs.next() if type_filter and self._get_ref_type_name(ref.getReferenceType()).lower() != type_filter.lower(): continue if skipped < offset: skipped += 1 continue item = self._build_xref_info(ref) if not grep_matches_item(item, grep_pattern): continue xrefs.append(item) count += 1 if from_addr_str: addr = self.program.getAddressFactory().getAddress(from_addr_str) refs = ref_mgr.getReferencesFrom(addr) count = 0 skipped = 0 for ref in refs: if count >= limit: break if type_filter and self._get_ref_type_name(ref.getReferenceType()).lower() != type_filter.lower(): continue if skipped < offset: skipped += 1 continue item = self._build_xref_info(ref) if not grep_matches_item(item, grep_pattern): continue xrefs.append(item) count += 1 return {"success": True, "result": xrefs, "offset": offset, "limit": limit} except Exception as e: return {"success": False, "error": {"code": "XREF_ERROR", "message": str(e)}} # ================================================================== # Classes & Namespaces Handlers # ================================================================== def handle_classes(self, exchange): if not self.program: return self._no_program() params = parse_query_params(exchange) limit = parse_int(params.get("limit"), 100) offset = parse_int(params.get("offset"), 0) class_names = set() for symbol in self.program.getSymbolTable().getAllSymbols(True): ns = symbol.getParentNamespace() if ns and not ns.isGlobal() and ns.getSymbol().getSymbolType().isNamespace(): class_names.add(ns.getName(True)) sorted_names = sorted(class_names) start = min(offset, len(sorted_names)) end = min(offset + limit, len(sorted_names)) classes = [] for name in sorted_names[start:end]: info = {"name": name} if "::" in name: info["namespace"] = name[:name.rfind("::")] info["simpleName"] = name[name.rfind("::") + 2:] else: info["namespace"] = "global" info["simpleName"] = name classes.append(info) return { "success": True, "result": classes, "size": len(sorted_names), "offset": offset, "limit": limit, } def handle_namespaces(self, exchange): if not self.program: return self._no_program() params = parse_query_params(exchange) limit = parse_int(params.get("limit"), 100) offset = parse_int(params.get("offset"), 0) namespaces = set() for symbol in self.program.getSymbolTable().getAllSymbols(True): ns = symbol.getParentNamespace() if ns and not ns.isGlobal(): namespaces.add(ns.getName(True)) sorted_ns = sorted(namespaces) start = min(offset, len(sorted_ns)) end = min(offset + limit, len(sorted_ns)) return { "success": True, "result": sorted_ns[start:end], "size": len(sorted_ns), "offset": offset, "limit": limit, } # ================================================================== # Structs Handlers # ================================================================== def handle_structs(self, exchange): if not self.program: return self._no_program() params = parse_query_params(exchange) # Detail lookup by name name_lookup = params.get("name") if name_lookup: return self._handle_struct_detail(name_lookup) # List structs limit = parse_int(params.get("limit"), 100) offset = parse_int(params.get("offset"), 0) category_filter = params.get("category") grep_pattern = compile_grep(params) from ghidra.program.model.data import Structure, Union structs = [] dtm = self.program.getDataTypeManager() count = 0 skipped = 0 it = dtm.getAllDataTypes() while it.hasNext() and count < limit: dt = it.next() if not isinstance(dt, (Structure, Union)): continue if category_filter and category_filter.lower() not in dt.getCategoryPath().getPath().lower(): continue if skipped < offset: skipped += 1 continue item = { "name": dt.getName(), "category": dt.getCategoryPath().getPath(), "path": dt.getPathName(), "size": dt.getLength(), "type": "struct" if isinstance(dt, Structure) else "union", "numFields": dt.getNumComponents(), "_links": {"self": make_link("/structs?name=%s" % dt.getName())}, } if not grep_matches_item(item, grep_pattern): continue structs.append(item) count += 1 return {"success": True, "result": structs, "offset": offset, "limit": limit} def _handle_struct_detail(self, name): from ghidra.program.model.data import Composite dt = self._find_struct(name) if not dt: return {"success": False, "error": {"code": "STRUCT_NOT_FOUND", "message": "Struct not found: %s" % name}} result = { "name": dt.getName(), "category": dt.getCategoryPath().getPath(), "path": dt.getPathName(), "size": dt.getLength(), "description": dt.getDescription() or "", } if isinstance(dt, Composite): fields = [] for comp in dt.getComponents(): field = { "name": comp.getFieldName(), "type": comp.getDataType().getName(), "typePath": comp.getDataType().getPathName(), "offset": comp.getOffset(), "length": comp.getLength(), } if comp.getComment(): field["comment"] = comp.getComment() fields.append(field) result["fields"] = fields result["numFields"] = len(fields) result["_links"] = { "self": make_link("/structs?name=%s" % name), "structs": make_link("/structs"), } return {"success": True, "result": result} def handle_struct_create(self, exchange): if not self.program: return self._no_program() body = parse_json_body(exchange) name = body.get("name") if not name: return {"success": False, "error": {"code": "MISSING_PARAMETER", "message": "Missing 'name'"}} category = body.get("category", "/") description = body.get("description", "") try: from ghidra.program.model.data import StructureDataType, CategoryPath, DataTypeConflictHandler cat_path = CategoryPath(category) new_struct = StructureDataType(cat_path, name, 0) if description: new_struct.setDescription(description) result_holder = [None] def do_create(): dtm = self.program.getDataTypeManager() result_holder[0] = dtm.addDataType(new_struct, DataTypeConflictHandler.DEFAULT_HANDLER) with_transaction(self.program, "Create struct", do_create) dt = result_holder[0] return ({"success": True, "result": { "name": dt.getName() if dt else name, "path": dt.getPathName() if dt else "%s/%s" % (category, name), "category": category, "size": 0, "message": "Struct created successfully", }}, 201) except Exception as e: return {"success": False, "error": {"code": "STRUCT_ERROR", "message": str(e)}} def handle_struct_addfield(self, exchange): if not self.program: return self._no_program() body = parse_json_body(exchange) struct_name = body.get("struct") field_name = body.get("fieldName") field_type = body.get("fieldType") if not struct_name or not field_name or not field_type: return {"success": False, "error": {"code": "MISSING_PARAMETER", "message": "Missing 'struct', 'fieldName', or 'fieldType'"}} struct = self._find_struct(struct_name) if not struct: return {"success": False, "error": {"code": "STRUCT_NOT_FOUND", "message": "Struct not found: %s" % struct_name}} dtm = self.program.getDataTypeManager() dt = resolve_data_type(dtm, field_type) if not dt: return {"success": False, "error": {"code": "UNKNOWN_TYPE", "message": "Cannot resolve type: %s" % field_type}} field_offset = body.get("offset") comment = body.get("comment") result_info = {} def do_add(): if field_offset is not None: struct.insertAtOffset(int(field_offset), dt, dt.getLength(), field_name, comment) result_info["offset"] = int(field_offset) else: struct.add(dt, dt.getLength(), field_name, comment) result_info["offset"] = struct.getLength() - dt.getLength() result_info["length"] = dt.getLength() result_info["structSize"] = struct.getLength() try: with_transaction(self.program, "Add struct field", do_add) return {"success": True, "result": { "struct": struct_name, "fieldName": field_name, "fieldType": field_type, "offset": result_info.get("offset", 0), "length": result_info.get("length", 0), "structSize": result_info.get("structSize", 0), "message": "Field added successfully", }} except Exception as e: return {"success": False, "error": {"code": "FIELD_ERROR", "message": str(e)}} def handle_struct_updatefield(self, exchange): if not self.program: return self._no_program() body = parse_json_body(exchange) struct_name = body.get("struct") if not struct_name: return {"success": False, "error": {"code": "MISSING_PARAMETER", "message": "Missing 'struct'"}} struct = self._find_struct(struct_name) if not struct: return {"success": False, "error": {"code": "STRUCT_NOT_FOUND", "message": "Struct not found: %s" % struct_name}} field_name = body.get("fieldName") field_offset = body.get("fieldOffset") new_name = body.get("newName") new_type = body.get("newType") new_comment = body.get("newComment") if not new_name and not new_type and new_comment is None: return {"success": False, "error": {"code": "MISSING_PARAMETER", "message": "Provide at least one of: newName, newType, newComment"}} # Find the component component = None if field_offset is not None: component = struct.getComponentAt(int(field_offset)) elif field_name: for comp in struct.getComponents(): if comp.getFieldName() == field_name: component = comp break if not component: return {"success": False, "error": {"code": "FIELD_NOT_FOUND", "message": "Field not found"}} orig_name = component.getFieldName() orig_type = component.getDataType().getName() orig_comment = component.getComment() def do_update(): if new_name: component.setFieldName(new_name) if new_comment is not None: component.setComment(new_comment) if new_type: dtm = self.program.getDataTypeManager() dt = resolve_data_type(dtm, new_type) if dt: component.setDataType(dt) try: with_transaction(self.program, "Update struct field", do_update) return {"success": True, "result": { "struct": struct_name, "offset": component.getOffset(), "originalName": orig_name, "originalType": orig_type, "originalComment": orig_comment, "newName": new_name or orig_name, "newType": new_type or orig_type, "newComment": new_comment if new_comment is not None else orig_comment, "length": component.getLength(), "message": "Field updated successfully", }} except Exception as e: return {"success": False, "error": {"code": "UPDATE_ERROR", "message": str(e)}} def handle_struct_delete(self, exchange): if not self.program: return self._no_program() body = parse_json_body(exchange) name = body.get("name") if not name: return {"success": False, "error": {"code": "MISSING_PARAMETER", "message": "Missing 'name'"}} dt = self._find_struct(name) if not dt: return {"success": False, "error": {"code": "STRUCT_NOT_FOUND", "message": "Struct not found: %s" % name}} category = dt.getCategoryPath().getPath() path = dt.getPathName() def do_delete(): dtm = self.program.getDataTypeManager() dtm.remove(dt, getMonitor()) try: with_transaction(self.program, "Delete struct", do_delete) return {"success": True, "result": { "name": name, "path": path, "category": category, "message": "Struct deleted successfully", }} except Exception as e: return {"success": False, "error": {"code": "DELETE_ERROR", "message": str(e)}} # ================================================================== # Analysis Handlers # ================================================================== def handle_analysis_info(self, exchange): if not self.program: return self._no_program() result = { "program": self.program.getName(), "processor": str(self.program.getLanguage().getProcessor()), "addressSize": self.program.getAddressFactory().getDefaultAddressSpace().getSize(), "programLanguage": str(self.program.getLanguage()), "availableAnalysis": ["callgraph", "xrefs", "decompile", "dataflow"], "_links": { "self": make_link("/analysis"), "callgraph": make_link("/analysis/callgraph"), "dataflow": make_link("/analysis/dataflow"), "program": make_link("/program"), }, } return {"success": True, "result": result} def handle_analysis_run(self, exchange): if not self.program: return self._no_program() try: # In headless mode, try to re-run auto-analysis from ghidra.app.script import GhidraScriptUtil analyzeAll(self.program) return {"success": True, "result": { "program": self.program.getName(), "analysis_triggered": True, "message": "Analysis initiated on program", }} except Exception as e: # analyzeAll may not be available; graceful fallback return {"success": True, "result": { "program": self.program.getName(), "analysis_triggered": False, "message": "Analysis request received (headless mode: %s)" % str(e), }} def handle_callgraph(self, exchange): if not self.program: return self._no_program() params = parse_query_params(exchange) func_name = params.get("name") func_addr = params.get("address") max_depth = parse_int(params.get("max_depth"), 3) start_func = None if func_addr: start_func = self._find_function_at(func_addr) elif func_name: start_func = self._find_function_by_name(func_name) else: # Use first entry point entry_iter = self.program.getSymbolTable().getExternalEntryPointIterator() if entry_iter.hasNext(): start_func = self.program.getFunctionManager().getFunctionAt(entry_iter.next()) if not start_func: return {"success": False, "error": {"code": "FUNCTION_NOT_FOUND", "message": "Function not found for callgraph"}} nodes = [] edges = [] visited = set() self._build_callgraph(start_func, nodes, edges, visited, 0, max_depth) graph = { "root": start_func.getName(), "rootAddress": str(start_func.getEntryPoint()), "maxDepth": max_depth, "nodes": nodes, "edges": edges, "nodeCount": len(nodes), "edgeCount": len(edges), } return {"success": True, "result": graph} def _build_callgraph(self, func, nodes, edges, visited, depth, max_depth): func_id = str(func.getEntryPoint()) if func_id in visited: return visited.add(func_id) nodes.append({ "id": func_id, "name": func.getName(), "address": func_id, "depth": depth, }) if depth >= max_depth: return ref_mgr = self.program.getReferenceManager() body = func.getBody() addr_iter = body.getAddresses(True) while addr_iter.hasNext(): addr = addr_iter.next() refs = ref_mgr.getReferencesFrom(addr) for ref in refs: if ref.getReferenceType().isCall(): called = self.program.getFunctionManager().getFunctionAt(ref.getToAddress()) if called: edges.append({ "from": func_id, "to": str(called.getEntryPoint()), "type": "call", "callSite": str(addr), }) self._build_callgraph(called, nodes, edges, visited, depth + 1, max_depth) def handle_dataflow(self, exchange): if not self.program: return self._no_program() params = parse_query_params(exchange) addr_str = params.get("address") if not addr_str: return {"success": False, "error": {"code": "MISSING_PARAMETER", "message": "Missing 'address' parameter"}} direction = params.get("direction", "forward") max_steps = parse_int(params.get("max_steps"), 50) try: addr = self.program.getAddressFactory().getAddress(addr_str) func = self.program.getFunctionManager().getFunctionContaining(addr) if not func: return {"success": False, "error": {"code": "FUNCTION_NOT_FOUND", "message": "No function containing address: %s" % addr_str}} # Simplified dataflow: walk instructions from the starting address listing = self.program.getListing() steps = [] current = addr step_count = 0 if direction == "forward": inst_iter = listing.getInstructions(addr, True) else: inst_iter = listing.getInstructions(addr, False) while inst_iter.hasNext() and step_count < max_steps: instr = inst_iter.next() # Stay within function body if not func.getBody().contains(instr.getAddress()): break full_repr = str(instr) steps.append({ "address": str(instr.getAddress()), "instruction": full_repr, "mnemonic": instr.getMnemonicString(), "bytes": bytes_to_hex(instr.getBytes()), }) step_count += 1 return { "success": True, "result": { "start_address": addr_str, "function": func.getName(), "direction": direction, "max_steps": max_steps, "total_steps": len(steps), "steps": steps, "sources": [], "sinks": [], } } except Exception as e: return {"success": False, "error": {"code": "DATAFLOW_ERROR", "message": str(e)}} # ================================================================== # Symbol CRUD Handlers # ================================================================== def handle_symbol_create(self, exchange): """POST /symbols - Create a new label/symbol.""" if not self.program: return self._no_program() body = parse_json_body(exchange) name = body.get("name", "") address = body.get("address", "") if not name or not address: return {"success": False, "error": {"code": "MISSING_PARAMETER", "message": "Both 'name' and 'address' are required"}} try: addr = self.program.getAddressFactory().getAddress(address) st = self.program.getSymbolTable() def do_create(): st.createLabel(addr, name, SourceType.USER_DEFINED) with_transaction(self.program, "Create symbol", do_create) return {"success": True, "result": { "name": name, "address": address, "message": "Symbol created successfully", }} except Exception as e: return {"success": False, "error": {"code": "SYMBOL_ERROR", "message": str(e)}} def handle_symbol_rename(self, exchange, addr_str): """PATCH /symbols/{address} - Rename primary symbol at address.""" if not self.program: return self._no_program() body = parse_json_body(exchange) new_name = body.get("name", "") if not new_name: return {"success": False, "error": {"code": "MISSING_PARAMETER", "message": "'name' parameter is required"}} try: addr = self.program.getAddressFactory().getAddress(addr_str) st = self.program.getSymbolTable() symbol = st.getPrimarySymbol(addr) if not symbol: return {"success": False, "error": {"code": "NOT_FOUND", "message": "No symbol at address: %s" % addr_str}} old_name = symbol.getName() def do_rename(): symbol.setName(new_name, SourceType.USER_DEFINED) with_transaction(self.program, "Rename symbol", do_rename) return {"success": True, "result": { "address": addr_str, "oldName": old_name, "newName": new_name, "message": "Symbol renamed successfully", }} except Exception as e: return {"success": False, "error": {"code": "SYMBOL_ERROR", "message": str(e)}} def handle_symbol_delete(self, exchange, addr_str): """DELETE /symbols/{address} - Delete primary symbol at address.""" if not self.program: return self._no_program() try: addr = self.program.getAddressFactory().getAddress(addr_str) st = self.program.getSymbolTable() symbol = st.getPrimarySymbol(addr) if not symbol: return {"success": False, "error": {"code": "NOT_FOUND", "message": "No symbol at address: %s" % addr_str}} name = symbol.getName() def do_delete(): symbol.delete() with_transaction(self.program, "Delete symbol", do_delete) return {"success": True, "result": { "address": addr_str, "name": name, "message": "Symbol deleted successfully", }} except Exception as e: return {"success": False, "error": {"code": "SYMBOL_ERROR", "message": str(e)}} # ================================================================== # Variable Rename Handler # ================================================================== def handle_variable_rename(self, exchange, addr_str, var_name): """PATCH /functions/{address}/variables/{name} - Rename/retype a variable.""" if not self.program: return self._no_program() body = parse_json_body(exchange) new_name = body.get("name", "") new_type = body.get("data_type") if not new_name: return {"success": False, "error": {"code": "MISSING_PARAMETER", "message": "'name' parameter is required"}} try: # URL-decode the variable name from java.net import URLDecoder decoded_name = URLDecoder.decode(var_name, "UTF-8") func = self._find_function_at(addr_str) if not func: return {"success": False, "error": {"code": "FUNCTION_NOT_FOUND", "message": "No function at address: %s" % addr_str}} # Search parameters and local variables target_var = None for param in func.getParameters(): if param.getName() == decoded_name: target_var = param break if not target_var: for var in func.getAllVariables(): if var.getName() == decoded_name: target_var = var break if not target_var: return {"success": False, "error": {"code": "NOT_FOUND", "message": "Variable '%s' not found in function" % decoded_name}} old_name = target_var.getName() def do_rename(): target_var.setName(new_name, SourceType.USER_DEFINED) if new_type: dtm = self.program.getDataTypeManager() dt = resolve_data_type(dtm, new_type) if dt: target_var.setDataType(dt, SourceType.USER_DEFINED) with_transaction(self.program, "Rename variable", do_rename) return {"success": True, "result": { "function": addr_str, "oldName": old_name, "newName": new_name, "message": "Variable renamed successfully", }} except Exception as e: return {"success": False, "error": {"code": "VARIABLE_ERROR", "message": str(e)}} # ================================================================== # Variables Handler # ================================================================== def handle_variables(self, exchange): """GET /variables - List global and function variables.""" if not self.program: return self._no_program() params = parse_query_params(exchange) limit = parse_int(params.get("limit"), 100) offset = parse_int(params.get("offset"), 0) global_only = params.get("global_only", "false").lower() == "true" search = params.get("search", "") grep_pattern = compile_grep(params) variables = [] count = 0 skipped = 0 # Function variables (parameters + locals) if not global_only: fm = self.program.getFunctionManager() for func in fm.getFunctions(True): if count >= limit: break func_name = func.getName() func_addr = str(func.getEntryPoint()) for param in func.getParameters(): if count >= limit: break var_name = param.getName() if search and search.lower() not in var_name.lower(): continue if skipped < offset: skipped += 1 continue item = { "name": var_name, "type": str(param.getDataType()), "storage": str(param.getVariableStorage()), "scope": "parameter", "function": func_name, "functionAddress": func_addr, } if not grep_matches_item(item, grep_pattern): continue variables.append(item) count += 1 for var in func.getLocalVariables(): if count >= limit: break var_name = var.getName() if search and search.lower() not in var_name.lower(): continue if skipped < offset: skipped += 1 continue item = { "name": var_name, "type": str(var.getDataType()), "storage": str(var.getVariableStorage()), "scope": "local", "function": func_name, "functionAddress": func_addr, } if not grep_matches_item(item, grep_pattern): continue variables.append(item) count += 1 # Global variables (defined data with symbol names) listing = self.program.getListing() st = self.program.getSymbolTable() for data in listing.getDefinedData(True): if count >= limit: break addr = data.getAddress() symbol = st.getPrimarySymbol(addr) if not symbol: continue sym_name = symbol.getName() # Skip auto-generated names if sym_name.startswith("DAT_") or sym_name.startswith("s_"): continue if search and search.lower() not in sym_name.lower(): continue if skipped < offset: skipped += 1 continue item = { "name": sym_name, "address": str(addr), "type": str(data.getDataType()), "scope": "global", "size": data.getLength(), } if not grep_matches_item(item, grep_pattern): continue variables.append(item) count += 1 return {"success": True, "result": variables, "offset": offset, "limit": limit} # ================================================================== # Bookmarks Handlers # ================================================================== def handle_bookmarks(self, exchange): """GET /bookmarks - List bookmarks with optional filtering.""" if not self.program: return self._no_program() params = parse_query_params(exchange) limit = parse_int(params.get("limit"), 100) offset = parse_int(params.get("offset"), 0) type_filter = params.get("type") category_filter = params.get("category") grep_pattern = compile_grep(params) bm = self.program.getBookmarkManager() bookmarks = [] count = 0 skipped = 0 # Get bookmark types to iterate if type_filter: bm_types = [type_filter] else: bm_types = [str(t) for t in bm.getBookmarkTypes()] for btype in bm_types: if count >= limit: break try: it = bm.getBookmarksIterator(btype) except: continue while it.hasNext() and count < limit: bookmark = it.next() if category_filter and bookmark.getCategory() != category_filter: continue if skipped < offset: skipped += 1 continue item = { "address": str(bookmark.getAddress()), "type": bookmark.getTypeString(), "category": bookmark.getCategory(), "comment": bookmark.getComment(), } if not grep_matches_item(item, grep_pattern): continue bookmarks.append(item) count += 1 return {"success": True, "result": bookmarks, "offset": offset, "limit": limit} def handle_bookmark_create(self, exchange): """POST /bookmarks - Create a bookmark.""" if not self.program: return self._no_program() body = parse_json_body(exchange) address = body.get("address", "") btype = body.get("type", "Note") category = body.get("category", "") comment = body.get("comment", "") if not address: return {"success": False, "error": {"code": "MISSING_PARAMETER", "message": "'address' is required"}} try: addr = self.program.getAddressFactory().getAddress(address) bm = self.program.getBookmarkManager() def do_create(): bm.setBookmark(addr, btype, category, comment) with_transaction(self.program, "Create bookmark", do_create) return {"success": True, "result": { "address": address, "type": btype, "category": category, "comment": comment, "message": "Bookmark created successfully", }} except Exception as e: return {"success": False, "error": {"code": "BOOKMARK_ERROR", "message": str(e)}} def handle_bookmark_delete(self, exchange, addr_str): """DELETE /bookmarks/{address} - Delete all bookmarks at address.""" if not self.program: return self._no_program() try: addr = self.program.getAddressFactory().getAddress(addr_str) bm = self.program.getBookmarkManager() removed = [] def do_delete(): for bookmark in list(bm.getBookmarks(addr)): removed.append(bookmark.getTypeString()) bookmark.remove() if hasattr(bookmark, 'remove') else bm.removeBookmark(bookmark) with_transaction(self.program, "Delete bookmarks", do_delete) return {"success": True, "result": { "address": addr_str, "removedTypes": removed, "count": len(removed), "message": "Bookmarks deleted successfully", }} except Exception as e: return {"success": False, "error": {"code": "BOOKMARK_ERROR", "message": str(e)}} # ================================================================== # Enum Handlers # ================================================================== def handle_enums(self, exchange): """GET /datatypes/enums - List enum data types.""" if not self.program: return self._no_program() params = parse_query_params(exchange) limit = parse_int(params.get("limit"), 100) offset = parse_int(params.get("offset"), 0) grep_pattern = compile_grep(params) from ghidra.program.model.data import Enum as GhidraEnum dtm = self.program.getDataTypeManager() enums = [] count = 0 skipped = 0 for dt in dtm.getAllDataTypes(): if count >= limit: break if not isinstance(dt, GhidraEnum): continue if skipped < offset: skipped += 1 continue # Get enum members members = [] for name in dt.getNames(): members.append({"name": name, "value": dt.getValue(name)}) item = { "name": dt.getName(), "category": str(dt.getCategoryPath()), "size": dt.getLength(), "members": members, } if not grep_matches_item(item, grep_pattern): continue enums.append(item) count += 1 return {"success": True, "result": enums, "offset": offset, "limit": limit} def handle_enum_create(self, exchange): """POST /datatypes/enums - Create a new enum.""" if not self.program: return self._no_program() body = parse_json_body(exchange) name = body.get("name", "") size = int(body.get("size", 4)) if not name: return {"success": False, "error": {"code": "MISSING_PARAMETER", "message": "'name' is required"}} try: from ghidra.program.model.data import EnumDataType, CategoryPath dtm = self.program.getDataTypeManager() new_enum = EnumDataType(name, size) def do_create(): dtm.addDataType(new_enum, None) with_transaction(self.program, "Create enum", do_create) return {"success": True, "result": { "name": name, "size": size, "message": "Enum created successfully", }} except Exception as e: return {"success": False, "error": {"code": "ENUM_ERROR", "message": str(e)}} # ================================================================== # Typedef Handlers # ================================================================== def handle_typedefs(self, exchange): """GET /datatypes/typedefs - List typedef data types.""" if not self.program: return self._no_program() params = parse_query_params(exchange) limit = parse_int(params.get("limit"), 100) offset = parse_int(params.get("offset"), 0) grep_pattern = compile_grep(params) from ghidra.program.model.data import TypeDef dtm = self.program.getDataTypeManager() typedefs = [] count = 0 skipped = 0 for dt in dtm.getAllDataTypes(): if count >= limit: break if not isinstance(dt, TypeDef): continue if skipped < offset: skipped += 1 continue item = { "name": dt.getName(), "category": str(dt.getCategoryPath()), "baseType": dt.getBaseDataType().getName() if dt.getBaseDataType() else None, "size": dt.getLength(), } if not grep_matches_item(item, grep_pattern): continue typedefs.append(item) count += 1 return {"success": True, "result": typedefs, "offset": offset, "limit": limit} def handle_typedef_create(self, exchange): """POST /datatypes/typedefs - Create a new typedef.""" if not self.program: return self._no_program() body = parse_json_body(exchange) name = body.get("name", "") base_type_name = body.get("base_type", "") if not name or not base_type_name: return {"success": False, "error": {"code": "MISSING_PARAMETER", "message": "'name' and 'base_type' are required"}} try: from ghidra.program.model.data import TypedefDataType dtm = self.program.getDataTypeManager() # Use the shared resolver which handles builtins + path lookups base_dt = resolve_data_type(dtm, base_type_name) if not base_dt: return {"success": False, "error": {"code": "NOT_FOUND", "message": "Base type not found: %s" % base_type_name}} new_typedef = TypedefDataType(name, base_dt) def do_create(): dtm.addDataType(new_typedef, None) with_transaction(self.program, "Create typedef", do_create) return {"success": True, "result": { "name": name, "baseType": base_type_name, "message": "Typedef created successfully", }} except Exception as e: return {"success": False, "error": {"code": "TYPEDEF_ERROR", "message": str(e)}} # ================================================================== # Legacy Compatibility # ================================================================== def handle_decompile_legacy(self, exchange): """Handle GET /decompile?name=X or ?address=X (backwards compat).""" if not self.program: return self._no_program() params = parse_query_params(exchange) name = params.get("name") or params.get("address") if not name: return {"success": False, "error": {"code": "MISSING_PARAMETER", "message": "Missing 'name' or 'address' parameter"}} # Try by name first, then by address func = self._find_function_by_name(name) if not func: func = self._find_function_at(name) if not func: return {"success": False, "error": {"code": "FUNCTION_NOT_FOUND", "message": "Function not found: %s" % name}} result = self.decompiler.decompileFunction(func, 30, getMonitor()) if result and result.decompileCompleted(): code = result.getDecompiledFunction().getC() return { "success": True, "name": func.getName(), "address": str(func.getEntryPoint()), "decompiled": code, } return {"success": False, "error": {"code": "DECOMPILE_FAILED", "message": "Decompilation failed"}} # ======================================================================== # Server Startup # ======================================================================== def run_server(port, program, decompiler): """Start the HTTP server with a single catch-all handler.""" server = HttpServer.create(InetSocketAddress(port), 0) server.createContext("/", GhydraMCPHandler(program, decompiler)) server.setExecutor(Executors.newCachedThreadPool()) server.start() println("[GhydraMCP] HTTP server started on port %d" % port) return server # ======================================================================== # Main Entry Point # ======================================================================== def main(): port = DEFAULT_PORT # Parse port from script arguments args = getScriptArgs() if args and len(args) > 0: try: port = int(args[0]) except: println("Invalid port number, using default: %d" % DEFAULT_PORT) # Initialize decompiler decompiler = DecompInterface() decompiler.openProgram(currentProgram) println("=========================================") println(" GhydraMCP Headless HTTP Server") println("=========================================") println(" API Version: %s (compat: %d)" % (API_VERSION_STRING, API_VERSION)) println(" Port: %d" % port) println(" Program: %s" % (currentProgram.getName() if currentProgram else "None")) println(" Script: Python/Jython (Full API)") println(" Routes: %d" % len(ROUTES)) println("=========================================") server = run_server(port, currentProgram, decompiler) println("") println("GhydraMCP Server running. Press Ctrl+C to stop.") println("API available at: http://localhost:%d/" % port) # Keep the script running try: while True: time.sleep(1) except KeyboardInterrupt: server.stop(0) println("[GhydraMCP] Server stopped.") # Run main()