diff --git a/docker/GhydraMCPServer.py b/docker/GhydraMCPServer.py index 142f591..216289b 100644 --- a/docker/GhydraMCPServer.py +++ b/docker/GhydraMCPServer.py @@ -1,5 +1,6 @@ # GhydraMCPServer.py - Headless Ghidra script for GhydraMCP HTTP API -# Python/Jython scripts don't require OSGi bundle registration +# Full API parity with the Java plugin implementation. +# Python 2 / Jython compatible (no f-strings, no readAllBytes). # # Usage: analyzeHeadless -import -postScript GhydraMCPServer.py [port] # @@ -8,302 +9,2091 @@ #@menupath #@toolbar +# === Java imports === from com.sun.net.httpserver import HttpServer, HttpHandler -from java.net import InetSocketAddress +from java.net import InetSocketAddress, URLDecoder from java.util.concurrent import Executors -from java.io import OutputStream -from ghidra.app.decompiler import DecompInterface -from ghidra.program.model.listing import Function -import json -import threading +from java.io import OutputStream, BufferedReader, InputStreamReader +# === Ghidra imports === +from ghidra.app.decompiler import DecompInterface +from ghidra.program.model.symbol import SourceType +from ghidra.program.model.listing import CodeUnit + +# === Python imports === +import json +import re +import threading +import time + +# === Constants === API_VERSION = 2 # Integer for MCP client compatibility (minimum expected: 2) API_VERSION_STRING = "2.1" DEFAULT_PORT = 8192 +# === Thread-safe transaction lock === +_tx_lock = threading.Lock() + + +# ======================================================================== +# Utility Functions +# ======================================================================== + +def url_decode(s): + """URL-decode a string using Java's URLDecoder.""" + try: + return URLDecoder.decode(s, "UTF-8") + except: + return s + + +def parse_query_params(exchange): + """Parse query string from an HttpExchange into a dict.""" + params = {} + query = exchange.getRequestURI().getQuery() + if query: + for part in query.split("&"): + if "=" in part: + k, v = part.split("=", 1) + params[k] = url_decode(v) + else: + params[part] = "" + return params + + +def read_request_body(exchange): + """Read the request body as a string (Jython-compatible, no readAllBytes).""" + reader = BufferedReader(InputStreamReader(exchange.getRequestBody(), "UTF-8")) + lines = [] + line = reader.readLine() + while line is not None: + lines.append(line) + line = reader.readLine() + reader.close() + return "\n".join(lines) + + +def parse_json_body(exchange): + """Read and parse JSON body from an HTTP request.""" + body = read_request_body(exchange) + if not body or not body.strip(): + return {} + try: + return json.loads(body) + except: + return {} + + +def parse_int(value, default): + """Safely parse an integer with a default fallback.""" + if value is None: + return default + try: + return int(value) + except (ValueError, TypeError): + return default + + +def bytes_to_hex(byte_array): + """Convert a Java/Python byte array to a hex string.""" + return ''.join(['%02x' % (b & 0xff) for b in byte_array]) + + +def hex_to_bytes(hex_str): + """Convert a hex string to a list of integer byte values.""" + result = [] + for i in range(0, len(hex_str), 2): + result.append(int(hex_str[i:i+2], 16)) + return result + + +def make_link(href): + """Create a HATEOAS link dict.""" + return {"href": href} + + +def with_transaction(program, desc, fn): + """Execute fn inside a thread-safe Ghidra transaction.""" + _tx_lock.acquire() + try: + tx_id = program.startTransaction(desc) + try: + result = fn() + program.endTransaction(tx_id, True) + return result + except: + program.endTransaction(tx_id, False) + raise + finally: + _tx_lock.release() + + +# ======================================================================== +# Data Type Resolution +# ======================================================================== + +_BUILTIN_TYPES = None + +def _init_builtin_types(): + """Lazily initialize the builtin data type map.""" + global _BUILTIN_TYPES + if _BUILTIN_TYPES is not None: + return _BUILTIN_TYPES + _BUILTIN_TYPES = {} + try: + from ghidra.program.model.data import ( + ByteDataType, WordDataType, DWordDataType, QWordDataType, + FloatDataType, DoubleDataType, CharDataType, + ShortDataType, IntegerDataType, LongDataType, LongLongDataType, + PointerDataType, BooleanDataType, + StringDataType, UnicodeDataType, + Undefined1DataType, Undefined2DataType, + Undefined4DataType, Undefined8DataType + ) + pairs = [ + ("byte", ByteDataType), ("word", WordDataType), + ("dword", DWordDataType), ("qword", QWordDataType), + ("float", FloatDataType), ("double", DoubleDataType), + ("char", CharDataType), ("short", ShortDataType), + ("int", IntegerDataType), ("long", LongDataType), + ("longlong", LongLongDataType), + ("pointer", PointerDataType), ("bool", BooleanDataType), + ("boolean", BooleanDataType), + ("string", StringDataType), ("unicode", UnicodeDataType), + ("undefined1", Undefined1DataType), ("undefined2", Undefined2DataType), + ("undefined4", Undefined4DataType), ("undefined8", Undefined8DataType), + ] + for name, cls in pairs: + try: + _BUILTIN_TYPES[name] = cls.dataType + except: + pass + except ImportError: + pass + # Also try unsigned variants + try: + from ghidra.program.model.data import ( + UnsignedShortDataType, UnsignedIntegerDataType, + UnsignedLongDataType, UnsignedLongLongDataType + ) + for name, cls in [ + ("ushort", UnsignedShortDataType), ("uint", UnsignedIntegerDataType), + ("ulong", UnsignedLongDataType), ("ulonglong", UnsignedLongLongDataType), + ]: + try: + _BUILTIN_TYPES[name] = cls.dataType + except: + pass + except ImportError: + pass + return _BUILTIN_TYPES + + +def resolve_data_type(dtm, type_name): + """Resolve a type name string to a Ghidra DataType object.""" + if not type_name: + return None + # Direct path lookup + dt = dtm.getDataType("/" + type_name) + if dt is not None: + return dt + # Builtin types map + builtins = _init_builtin_types() + dt = builtins.get(type_name.lower()) + if dt is not None: + return dt + # Search all data types (expensive fallback) + it = dtm.getAllDataTypes() + while it.hasNext(): + candidate = it.next() + if candidate.getName().lower() == type_name.lower(): + return candidate + return None + + +# ======================================================================== +# Comment type mapping +# ======================================================================== + +COMMENT_TYPE_MAP = { + "pre": CodeUnit.PRE_COMMENT, + "post": CodeUnit.POST_COMMENT, + "eol": CodeUnit.EOL_COMMENT, + "plate": CodeUnit.PLATE_COMMENT, + "repeatable": CodeUnit.REPEATABLE_COMMENT, +} + + +# ======================================================================== +# Route Table +# ======================================================================== + +ROUTES = [ + # (HTTP method, path regex, handler method name) + + # Root / Meta + ("GET", r"^/$", "handle_root"), + ("GET", r"^/info$", "handle_info"), + ("GET", r"^/program$", "handle_program"), + + # Headless stubs (cursor-dependent, not available headless) + ("GET", r"^/address$", "handle_address_stub"), + ("GET", r"^/function$", "handle_function_stub"), + + # Functions - by-name variants (must precede address patterns) + ("GET", r"^/functions/by-name/([^/]+)/decompile$", "handle_decompile_by_name"), + ("GET", r"^/functions/by-name/([^/]+)/disassembly$", "handle_disassembly_by_name"), + ("PUT", r"^/functions/by-name/([^/]+)/signature$", "handle_signature_by_name"), + ("PATCH", r"^/functions/by-name/([^/]+)$", "handle_patch_function_by_name"), + ("GET", r"^/functions/by-name/([^/]+)$", "handle_function_by_name"), + + # Functions - by address + ("GET", r"^/functions/([^/]+)/decompile$", "handle_decompile"), + ("GET", r"^/functions/([^/]+)/disassembly$", "handle_disassembly"), + ("GET", r"^/functions/([^/]+)/variables$", "handle_function_variables"), + ("PUT", r"^/functions/([^/]+)/signature$", "handle_signature"), + ("PATCH", r"^/functions/([^/]+)$", "handle_patch_function"), + ("GET", r"^/functions/([^/]+)$", "handle_function_detail"), + + # Functions list / create + ("GET", r"^/functions/?$", "handle_functions_list"), + ("POST", r"^/functions/?$", "handle_create_function"), + + # Data + ("POST", r"^/data/delete$", "handle_data_delete"), + ("POST", r"^/data/type$", "handle_data_type"), + ("GET", r"^/data/strings$", "handle_strings"), + ("GET", r"^/data/?$", "handle_data_list"), + ("POST", r"^/data/?$", "handle_data_create"), + + # Strings + ("GET", r"^/strings$", "handle_strings"), + + # Memory + ("POST", r"^/memory/([^/]+)/comments/([^/]+)$", "handle_set_comment"), + ("GET", r"^/memory/blocks$", "handle_memory_blocks"), + ("GET", r"^/memory$", "handle_memory_read"), + ("PATCH", r"^/programs/current/memory/([^/]+)$", "handle_memory_write"), + + # Segments + ("GET", r"^/segments$", "handle_segments"), + + # Symbols + ("GET", r"^/symbols/imports$", "handle_imports"), + ("GET", r"^/symbols/exports$", "handle_exports"), + ("GET", r"^/symbols$", "handle_symbols"), + + # Cross-references + ("GET", r"^/xrefs$", "handle_xrefs"), + + # Classes & Namespaces + ("GET", r"^/classes$", "handle_classes"), + ("GET", r"^/namespaces$", "handle_namespaces"), + + # Structs + ("POST", r"^/structs/create$", "handle_struct_create"), + ("POST", r"^/structs/addfield$", "handle_struct_addfield"), + ("POST", r"^/structs/updatefield$", "handle_struct_updatefield"), + ("POST", r"^/structs/delete$", "handle_struct_delete"), + ("GET", r"^/structs$", "handle_structs"), + + # Analysis + ("GET", r"^/analysis/callgraph$", "handle_callgraph"), + ("GET", r"^/analysis/dataflow$", "handle_dataflow"), + ("GET", r"^/analysis$", "handle_analysis_info"), + ("POST", r"^/analysis$", "handle_analysis_run"), + + # Legacy compatibility + ("GET", r"^/decompile$", "handle_decompile_legacy"), +] + + +# ======================================================================== +# HTTP Handler +# ======================================================================== + class GhydraMCPHandler(HttpHandler): + def __init__(self, program, decompiler): self.program = program self.decompiler = decompiler + # Pre-compile route patterns + self.routes = [] + for method, pattern, handler_name in ROUTES: + self.routes.append((method, re.compile(pattern), handler_name)) + + # ------------------------------------------------------------------ + # Dispatch + # ------------------------------------------------------------------ def handle(self, exchange): try: - path = exchange.getRequestURI().getPath() method = exchange.getRequestMethod() + path = exchange.getRequestURI().getPath() - # Route to appropriate handler - if path == "/" or path == "": - response = self.handle_root() - elif path == "/functions": - response = self.handle_functions() - elif path.endswith("/decompile"): - # Handle /functions/{address}/decompile - response = self.handle_decompile_by_path(path) - elif path.startswith("/functions/"): - response = self.handle_function_detail(path) - elif path == "/strings" or path == "/data/strings": - response = self.handle_strings() - elif path == "/info": - response = self.handle_info() - elif path == "/decompile": - response = self.handle_decompile(exchange) - else: - response = {"success": False, "error": "Not found", "path": path} + # CORS preflight + if method == "OPTIONS": + self._send_cors_preflight(exchange) + return - self.send_response(exchange, 200, response) + # Match routes + for route_method, pattern, handler_name in self.routes: + if method != route_method: + continue + match = pattern.match(path) + if match: + handler = getattr(self, handler_name) + groups = match.groups() + decoded = tuple(url_decode(g) for g in groups) + result = handler(exchange, *decoded) + if isinstance(result, tuple): + response, code = result + self._send_response(exchange, code, response) + else: + self._send_response(exchange, 200, result) + return + + # No route matched + self._send_response(exchange, 404, { + "success": False, + "error": {"code": "NOT_FOUND", "message": "Endpoint not found: %s %s" % (method, path)} + }) except Exception as e: - self.send_response(exchange, 500, {"success": False, "error": str(e)}) + try: + self._send_response(exchange, 500, { + "success": False, + "error": {"code": "INTERNAL_ERROR", "message": str(e)} + }) + except: + pass - def send_response(self, exchange, code, data): + def _send_response(self, exchange, code, data): response_bytes = json.dumps(data, indent=2).encode('utf-8') - exchange.getResponseHeaders().set("Content-Type", "application/json") - exchange.getResponseHeaders().set("Access-Control-Allow-Origin", "*") + headers = exchange.getResponseHeaders() + headers.set("Content-Type", "application/json; charset=utf-8") + headers.set("Access-Control-Allow-Origin", "*") + headers.set("Access-Control-Allow-Methods", "GET, POST, PATCH, PUT, DELETE, OPTIONS") + headers.set("Access-Control-Allow-Headers", "Content-Type, X-Request-ID") exchange.sendResponseHeaders(code, len(response_bytes)) os = exchange.getResponseBody() os.write(response_bytes) os.close() - def handle_root(self): + def _send_cors_preflight(self, exchange): + headers = exchange.getResponseHeaders() + headers.set("Access-Control-Allow-Origin", "*") + headers.set("Access-Control-Allow-Methods", "GET, POST, PATCH, PUT, DELETE, OPTIONS") + headers.set("Access-Control-Allow-Headers", "Content-Type, X-Request-ID") + headers.set("Access-Control-Max-Age", "86400") + exchange.sendResponseHeaders(204, -1) + exchange.getResponseBody().close() + + # ------------------------------------------------------------------ + # Internal helpers + # ------------------------------------------------------------------ + + def _no_program(self): + return {"success": False, "error": {"code": "NO_PROGRAM_LOADED", "message": "No program loaded"}} + + def _find_function_at(self, addr_str): + """Find a function by address string. Falls back to containing function.""" + try: + addr = self.program.getAddressFactory().getAddress(addr_str) + fm = self.program.getFunctionManager() + func = fm.getFunctionAt(addr) + if func is None: + func = fm.getFunctionContaining(addr) + return func + except: + return None + + def _find_function_by_name(self, name): + """Find a function by exact name match.""" + fm = self.program.getFunctionManager() + for func in fm.getFunctions(True): + if func.getName() == name: + return func + return None + + def _get_function(self, identifier, by_name=False): + """Resolve a function by name or address.""" + if by_name: + return self._find_function_by_name(identifier) + return self._find_function_at(identifier) + + def _decompile_function(self, func): + """Decompile a function and return the result dict.""" + result = self.decompiler.decompileFunction(func, 60, getMonitor()) + resp = { + "name": func.getName(), + "address": str(func.getEntryPoint()), + } + if result and result.decompileCompleted(): + decomp = result.getDecompiledFunction() + code = decomp.getC() + resp["decompiled"] = code + resp["decompiled_text"] = code # alias for MCP client compat + resp["ccode"] = code # alias for API spec compat + resp["signature"] = decomp.getSignature() + else: + msg = result.getErrorMessage() if result else "Unknown error" + resp["error"] = "Decompilation failed: %s" % msg + addr = str(func.getEntryPoint()) + resp["_links"] = { + "self": make_link("/functions/%s/decompile" % addr), + "function": make_link("/functions/%s" % addr), + "disassembly": make_link("/functions/%s/disassembly" % addr), + } + return {"success": True, "result": resp} + + def _build_disassembly(self, func): + """Get disassembly instructions for a function.""" + instructions = [] + listing = self.program.getListing() + body = func.getBody() + inst_iter = listing.getInstructions(body, True) + while inst_iter.hasNext(): + instr = inst_iter.next() + full_repr = str(instr) + mnemonic = instr.getMnemonicString() + operands = full_repr[len(mnemonic):].strip() if len(full_repr) > len(mnemonic) else "" + instructions.append({ + "address": str(instr.getAddress()), + "mnemonic": mnemonic, + "operands": operands, + "bytes": bytes_to_hex(instr.getBytes()), + }) + addr = str(func.getEntryPoint()) + # Build text representation for MCP client + lines = [] + for inst in instructions: + lines.append("%s %s %s" % (inst["address"], inst["mnemonic"], inst["operands"])) + disassembly_text = "\n".join(lines) return { "success": True, - "api_version": API_VERSION, - "message": "GhydraMCP API " + API_VERSION_STRING, - "program": self.program.getName() if self.program else None, - "endpoints": ["/", "/info", "/functions", "/functions/", "/strings", "/decompile"], - "_links": { - "self": "/", - "functions": "/functions", - "strings": "/strings", - "info": "/info" + "result": { + "name": func.getName(), + "address": addr, + "instructions": instructions, + "instructionCount": len(instructions), + "disassembly_text": disassembly_text, + "_links": { + "self": make_link("/functions/%s/disassembly" % addr), + "function": make_link("/functions/%s" % addr), + "decompile": make_link("/functions/%s/decompile" % addr), + }, } } - def handle_info(self): - if not self.program: - return {"success": False, "error": "No program loaded"} - return { + def _build_function_info(self, func): + """Build a detailed function info dict.""" + addr = str(func.getEntryPoint()) + info = { + "name": func.getName(), + "address": addr, + "signature": str(func.getSignature()), + "returnType": func.getReturnType().getName(), + "parameterCount": func.getParameterCount(), + "callingConvention": func.getCallingConventionName(), + "isThunk": func.isThunk(), + "isExternal": func.isExternal(), + "bodySize": func.getBody().getNumAddresses(), + } + # Parameters + params = [] + for param in func.getParameters(): + params.append({ + "name": param.getName(), + "type": param.getDataType().getName(), + "ordinal": param.getOrdinal(), + }) + info["parameters"] = params + # Comments + entry = func.getEntryPoint() + listing = self.program.getListing() + cu = listing.getCodeUnitAt(entry) + if cu: + for ctype, cname in [ + (CodeUnit.PRE_COMMENT, "preComment"), + (CodeUnit.POST_COMMENT, "postComment"), + (CodeUnit.EOL_COMMENT, "eolComment"), + (CodeUnit.PLATE_COMMENT, "plateComment"), + ]: + c = cu.getComment(ctype) + if c: + info[cname] = c + func_comment = func.getComment() + if func_comment: + info["comment"] = func_comment + # HATEOAS links + info["_links"] = { + "self": make_link("/functions/%s" % addr), + "decompile": make_link("/functions/%s/decompile" % addr), + "disassembly": make_link("/functions/%s/disassembly" % addr), + "variables": make_link("/functions/%s/variables" % addr), + "xrefs_to": make_link("/xrefs?to_addr=%s" % addr), + "xrefs_from": make_link("/xrefs?from_addr=%s" % addr), + } + return info + + def _build_xref_info(self, ref): + """Build a cross-reference info dict.""" + info = { + "fromAddress": str(ref.getFromAddress()), + "toAddress": str(ref.getToAddress()), + "type": self._get_ref_type_name(ref.getReferenceType()), + "isPrimary": ref.isPrimary(), + } + fm = self.program.getFunctionManager() + from_func = fm.getFunctionContaining(ref.getFromAddress()) + to_func = fm.getFunctionContaining(ref.getToAddress()) + if from_func: + info["fromFunction"] = from_func.getName() + if to_func: + info["toFunction"] = to_func.getName() + return info + + def _get_ref_type_name(self, ref_type): + if ref_type.isCall(): + return "CALL" + if ref_type.isData(): + return "DATA" + if ref_type.isRead(): + return "READ" + if ref_type.isWrite(): + return "WRITE" + if ref_type.isJump(): + return "JUMP" + return str(ref_type) + + def _find_struct(self, name): + """Find a struct/union data type by name.""" + dtm = self.program.getDataTypeManager() + it = dtm.getAllDataTypes() + while it.hasNext(): + dt = it.next() + if dt.getName() == name: + from ghidra.program.model.data import Structure, Union + if isinstance(dt, (Structure, Union)): + return dt + return None + + # ================================================================== + # Root / Meta Handlers + # ================================================================== + + def handle_root(self, exchange): + result = { "success": True, + "api_version": API_VERSION, + "api_version_string": API_VERSION_STRING, + "message": "GhydraMCP Headless API", + "mode": "headless", + } + if self.program: + result["program"] = self.program.getName() + result["file"] = self.program.getExecutablePath() + result["language"] = str(self.program.getLanguageID()) + result["_links"] = { + "self": make_link("/"), + "info": make_link("/info"), + "program": make_link("/program"), + "functions": make_link("/functions"), + "data": make_link("/data"), + "strings": make_link("/strings"), + "memory": make_link("/memory"), + "segments": make_link("/segments"), + "symbols": make_link("/symbols"), + "xrefs": make_link("/xrefs"), + "classes": make_link("/classes"), + "namespaces": make_link("/namespaces"), + "structs": make_link("/structs"), + "analysis": make_link("/analysis"), + } + return result + + def handle_info(self, exchange): + if not self.program: + return self._no_program() + mem = self.program.getMemory() + result = { + "success": True, + "result": { + "name": self.program.getName(), + "path": self.program.getExecutablePath(), + "language": str(self.program.getLanguageID()), + "processor": str(self.program.getLanguage().getProcessor()), + "addressSize": self.program.getAddressFactory().getDefaultAddressSpace().getSize(), + "imageBase": str(self.program.getImageBase()), + "minAddress": str(mem.getMinAddress()), + "maxAddress": str(mem.getMaxAddress()), + "memorySize": mem.getSize(), + "functionCount": self.program.getFunctionManager().getFunctionCount(), + "mode": "headless", + "_links": { + "self": make_link("/info"), + "program": make_link("/program"), + "functions": make_link("/functions"), + }, + } + } + return result + + def handle_program(self, exchange): + if not self.program: + return self._no_program() + mem = self.program.getMemory() + fm = self.program.getFunctionManager() + result = { "name": self.program.getName(), "path": self.program.getExecutablePath(), - "language": str(self.program.getLanguage().getLanguageID()), + "language": str(self.program.getLanguageID()), + "compiler": str(self.program.getCompilerSpec().getCompilerSpecID()), "processor": str(self.program.getLanguage().getProcessor()), "addressSize": self.program.getAddressFactory().getDefaultAddressSpace().getSize(), "imageBase": str(self.program.getImageBase()), + "minAddress": str(mem.getMinAddress()), + "maxAddress": str(mem.getMaxAddress()), + "memorySize": mem.getSize(), + "functionCount": fm.getFunctionCount(), + "_links": { + "self": make_link("/program"), + "functions": make_link("/functions"), + "symbols": make_link("/symbols"), + "segments": make_link("/segments"), + "analysis": make_link("/analysis"), + }, + } + return {"success": True, "result": result} + + # ================================================================== + # Headless Stubs + # ================================================================== + + def handle_address_stub(self, exchange): + return { + "success": False, + "error": { + "code": "HEADLESS_MODE", + "message": "Current address is not available in headless mode. Use /functions or /data with address parameters instead." + } } - def handle_functions(self): + def handle_function_stub(self, exchange): + return { + "success": False, + "error": { + "code": "HEADLESS_MODE", + "message": "Current function is not available in headless mode. Use /functions/{address} instead." + } + } + + # ================================================================== + # Function Handlers + # ================================================================== + + def handle_functions_list(self, exchange): if not self.program: - return {"success": False, "error": "No program loaded"} + return self._no_program() + params = parse_query_params(exchange) + limit = parse_int(params.get("limit"), 100) + offset = parse_int(params.get("offset"), 0) + name_filter = params.get("name") functions = [] fm = self.program.getFunctionManager() - for func in fm.getFunctions(True): # True = forward iteration + total = fm.getFunctionCount() + count = 0 + skipped = 0 + + for func in fm.getFunctions(True): + if count >= limit: + break + # Apply name filter + if name_filter and name_filter.lower() not in func.getName().lower(): + continue + if skipped < offset: + skipped += 1 + continue + addr = str(func.getEntryPoint()) functions.append({ "name": func.getName(), - "address": str(func.getEntryPoint()), + "address": addr, "signature": str(func.getSignature()), + "parameterCount": func.getParameterCount(), + "isThunk": func.isThunk(), + "_links": { + "self": make_link("/functions/%s" % addr), + "decompile": make_link("/functions/%s/decompile" % addr), + "disassembly": make_link("/functions/%s/disassembly" % addr), + }, }) - if len(functions) >= 10000: # Higher limit for MCP client - break + count += 1 - return { + result = { "success": True, - "size": len(functions), - "result": functions, # MCP client expects "result" key + "result": functions, + "size": total, + "offset": offset, + "limit": limit, + "_links": { + "self": make_link("/functions?offset=%d&limit=%d" % (offset, limit)), + }, } + if offset + count < total: + result["_links"]["next"] = make_link("/functions?offset=%d&limit=%d" % (offset + limit, limit)) + if offset > 0: + result["_links"]["prev"] = make_link("/functions?offset=%d&limit=%d" % (max(0, offset - limit), limit)) + return result - def handle_function_detail(self, path): + def handle_function_detail(self, exchange, addr_str): if not self.program: - return {"success": False, "error": "No program loaded"} - - # Extract function name or address from path - parts = path.split("/") - if len(parts) < 3: - return {"success": False, "error": "Invalid path"} - - name_or_addr = parts[2] - fm = self.program.getFunctionManager() - - # Try to find by name first - func = None - for f in fm.getFunctions(True): - if f.getName() == name_or_addr: - func = f - break - - # If not found, try by address + return self._no_program() + func = self._find_function_at(addr_str) if not func: + return {"success": False, "error": {"code": "FUNCTION_NOT_FOUND", "message": "No function at address: %s" % addr_str}} + return {"success": True, "result": self._build_function_info(func)} + + def handle_function_by_name(self, exchange, name): + if not self.program: + return self._no_program() + func = self._find_function_by_name(name) + if not func: + return {"success": False, "error": {"code": "FUNCTION_NOT_FOUND", "message": "Function not found: %s" % name}} + return {"success": True, "result": self._build_function_info(func)} + + # -- Decompile -- + + def handle_decompile(self, exchange, addr_str): + if not self.program: + return self._no_program() + func = self._find_function_at(addr_str) + if not func: + return {"success": False, "error": {"code": "FUNCTION_NOT_FOUND", "message": "No function at address: %s" % addr_str}} + return self._decompile_function(func) + + def handle_decompile_by_name(self, exchange, name): + if not self.program: + return self._no_program() + func = self._find_function_by_name(name) + if not func: + return {"success": False, "error": {"code": "FUNCTION_NOT_FOUND", "message": "Function not found: %s" % name}} + return self._decompile_function(func) + + # -- Disassembly -- + + def handle_disassembly(self, exchange, addr_str): + if not self.program: + return self._no_program() + func = self._find_function_at(addr_str) + if not func: + return {"success": False, "error": {"code": "FUNCTION_NOT_FOUND", "message": "No function at address: %s" % addr_str}} + return self._build_disassembly(func) + + def handle_disassembly_by_name(self, exchange, name): + if not self.program: + return self._no_program() + func = self._find_function_by_name(name) + if not func: + return {"success": False, "error": {"code": "FUNCTION_NOT_FOUND", "message": "Function not found: %s" % name}} + return self._build_disassembly(func) + + # -- Variables -- + + def handle_function_variables(self, exchange, addr_str): + if not self.program: + return self._no_program() + func = self._find_function_at(addr_str) + if not func: + return {"success": False, "error": {"code": "FUNCTION_NOT_FOUND", "message": "No function at address: %s" % addr_str}} + + variables = [] + # Parameters + for param in func.getParameters(): + variables.append({ + "name": param.getName(), + "type": param.getDataType().getName(), + "storage": str(param.getVariableStorage()), + "kind": "parameter", + "ordinal": param.getOrdinal(), + }) + # Local variables + for local in func.getLocalVariables(): + var_info = { + "name": local.getName(), + "type": local.getDataType().getName(), + "storage": str(local.getVariableStorage()), + "kind": "local", + } try: - addr = self.program.getAddressFactory().getAddress(name_or_addr) - func = fm.getFunctionAt(addr) + so = local.getStackOffset() + if so != 0: + var_info["stackOffset"] = so except: pass + variables.append(var_info) + addr = str(func.getEntryPoint()) + return { + "success": True, + "result": { + "name": func.getName(), + "address": addr, + "variables": variables, + "parameterCount": func.getParameterCount(), + "localVariableCount": len(func.getLocalVariables()), + } + } + + # -- Patch (rename / comment) -- + + def _handle_patch_function_impl(self, exchange, identifier, by_name): + if not self.program: + return self._no_program() + func = self._get_function(identifier, by_name) if not func: - return {"success": False, "error": "Function not found: " + name_or_addr} + return {"success": False, "error": {"code": "FUNCTION_NOT_FOUND", "message": "Function not found: %s" % identifier}} + + body = parse_json_body(exchange) + new_name = body.get("name") + comment = body.get("comment") + + if not new_name and comment is None: + return {"success": False, "error": {"code": "MISSING_PARAMETER", "message": "Provide 'name' and/or 'comment' in request body"}} + + old_name = func.getName() + addr = str(func.getEntryPoint()) + result = {"address": addr, "oldName": old_name} + + def do_patch(): + if new_name: + func.setName(new_name, SourceType.USER_DEFINED) + result["newName"] = new_name + if comment is not None: + func.setComment(comment) + result["comment"] = comment + + with_transaction(self.program, "Patch function", do_patch) + result["message"] = "Function updated successfully" + return {"success": True, "result": result} + + def handle_patch_function(self, exchange, addr_str): + return self._handle_patch_function_impl(exchange, addr_str, by_name=False) + + def handle_patch_function_by_name(self, exchange, name): + return self._handle_patch_function_impl(exchange, name, by_name=True) + + # -- Create function -- + + def handle_create_function(self, exchange): + if not self.program: + return self._no_program() + body = parse_json_body(exchange) + addr_str = body.get("address") + if not addr_str: + return {"success": False, "error": {"code": "MISSING_PARAMETER", "message": "Missing 'address' in request body"}} + + try: + addr = self.program.getAddressFactory().getAddress(addr_str) + except: + return {"success": False, "error": {"code": "INVALID_ADDRESS", "message": "Invalid address: %s" % addr_str}} + + result_holder = [None] + + def do_create(): + fm = self.program.getFunctionManager() + func = fm.createFunction(None, addr, None, SourceType.USER_DEFINED) + result_holder[0] = func + + try: + with_transaction(self.program, "Create function", do_create) + func = result_holder[0] + if func: + return ({"success": True, "result": { + "name": func.getName(), + "address": str(func.getEntryPoint()), + "message": "Function created successfully", + }}, 201) + return {"success": False, "error": {"code": "CREATE_FAILED", "message": "Failed to create function at %s" % addr_str}} + except Exception as e: + return {"success": False, "error": {"code": "CREATE_ERROR", "message": str(e)}} + + # -- Signature -- + + def _handle_signature_impl(self, exchange, identifier, by_name): + if not self.program: + return self._no_program() + func = self._get_function(identifier, by_name) + if not func: + return {"success": False, "error": {"code": "FUNCTION_NOT_FOUND", "message": "Function not found: %s" % identifier}} + + body = parse_json_body(exchange) + sig_str = body.get("signature") + if not sig_str: + return {"success": False, "error": {"code": "MISSING_PARAMETER", "message": "Missing 'signature' in request body"}} + + addr = str(func.getEntryPoint()) + try: + from ghidra.app.util.cparser.C import CParser + dtm = self.program.getDataTypeManager() + parser = CParser(dtm) + parsed = parser.parse(sig_str + ";") + + def do_set_sig(): + from ghidra.program.model.data import FunctionDefinitionDataType + if isinstance(parsed, FunctionDefinitionDataType): + from ghidra.app.cmd.function import ApplyFunctionSignatureCmd + cmd = ApplyFunctionSignatureCmd(func.getEntryPoint(), parsed, SourceType.USER_DEFINED) + cmd.applyTo(self.program, getMonitor()) + + with_transaction(self.program, "Set function signature", do_set_sig) + return {"success": True, "result": { + "address": addr, + "signature": sig_str, + "message": "Signature updated", + }} + except Exception as e: + # Fallback: try just setting name from signature + return {"success": False, "error": {"code": "SIGNATURE_ERROR", "message": "Could not parse signature: %s" % str(e)}} + + def handle_signature(self, exchange, addr_str): + return self._handle_signature_impl(exchange, addr_str, by_name=False) + + def handle_signature_by_name(self, exchange, name): + return self._handle_signature_impl(exchange, name, by_name=True) + + # ================================================================== + # Data Handlers + # ================================================================== + + def handle_data_list(self, exchange): + if not self.program: + return self._no_program() + params = parse_query_params(exchange) + limit = parse_int(params.get("limit"), 100) + offset = parse_int(params.get("offset"), 0) + addr_filter = params.get("addr") + name_filter = params.get("name") + name_contains = params.get("name_contains") + type_filter = params.get("type") + + # Single address lookup + if addr_filter: + try: + addr = self.program.getAddressFactory().getAddress(addr_filter) + data = self.program.getListing().getDataAt(addr) + if data: + value = data.getDefaultValueRepresentation() + if value and len(value) > 200: + value = value[:200] + "..." + item = { + "address": str(data.getAddress()), + "type": data.getDataType().getName(), + "length": data.getLength(), + "value": value, + } + sym = self.program.getSymbolTable().getPrimarySymbol(addr) + if sym: + item["name"] = sym.getName() + return {"success": True, "result": [item], "size": 1, "offset": 0, "limit": 1} + return {"success": True, "result": [], "size": 0, "offset": 0, "limit": limit} + except: + return {"success": True, "result": [], "size": 0, "offset": 0, "limit": limit} + + data_items = [] + listing = self.program.getListing() + st = self.program.getSymbolTable() + count = 0 + skipped = 0 + + for data in listing.getDefinedData(True): + if count >= limit: + break + # Type filter + if type_filter and type_filter.lower() not in data.getDataType().getName().lower(): + continue + # Name filters (require symbol lookup) + if name_filter or name_contains: + sym = st.getPrimarySymbol(data.getAddress()) + sym_name = sym.getName() if sym else "" + if name_filter and sym_name != name_filter: + continue + if name_contains and name_contains.lower() not in sym_name.lower(): + continue + if skipped < offset: + skipped += 1 + continue + value = data.getDefaultValueRepresentation() + if value and len(value) > 200: + value = value[:200] + "..." + item = { + "address": str(data.getAddress()), + "type": data.getDataType().getName(), + "length": data.getLength(), + "value": value, + } + sym = st.getPrimarySymbol(data.getAddress()) + if sym: + item["name"] = sym.getName() + item["_links"] = {"self": make_link("/data/%s" % str(data.getAddress()))} + data_items.append(item) + count += 1 return { "success": True, - "name": func.getName(), - "address": str(func.getEntryPoint()), - "signature": str(func.getSignature()), - "body": str(func.getBody()), - "callingConvention": func.getCallingConventionName(), - "parameterCount": func.getParameterCount(), + "result": data_items, + "offset": offset, + "limit": limit, } - def handle_strings(self): + def handle_data_create(self, exchange): if not self.program: - return {"success": False, "error": "No program loaded"} + return self._no_program() + body = parse_json_body(exchange) + addr_str = body.get("address") + if not addr_str: + return {"success": False, "error": {"code": "MISSING_PARAMETER", "message": "Missing 'address'"}} + + try: + addr = self.program.getAddressFactory().getAddress(addr_str) + except: + return {"success": False, "error": {"code": "INVALID_ADDRESS", "message": "Invalid address: %s" % addr_str}} + + # Label creation (newName field) + new_name = body.get("newName") + if new_name: + def do_label(): + self.program.getSymbolTable().createLabel(addr, new_name, SourceType.USER_DEFINED) + try: + with_transaction(self.program, "Create label", do_label) + return {"success": True, "result": {"address": addr_str, "name": new_name, "message": "Label created"}} + except Exception as e: + return {"success": False, "error": {"code": "LABEL_ERROR", "message": str(e)}} + + # Data creation (type field) + type_name = body.get("type") + if not type_name: + return {"success": False, "error": {"code": "MISSING_PARAMETER", "message": "Missing 'type' or 'newName'"}} + + dtm = self.program.getDataTypeManager() + dt = resolve_data_type(dtm, type_name) + if not dt: + return {"success": False, "error": {"code": "UNKNOWN_TYPE", "message": "Cannot resolve type: %s" % type_name}} + + def do_create_data(): + from ghidra.program.model.data import DataUtilities + DataUtilities.createData( + self.program, addr, dt, -1, False, + DataUtilities.ClearDataMode.CLEAR_ALL_UNDEFINED_CONFLICT_DATA + ) + + try: + with_transaction(self.program, "Create data", do_create_data) + return ({"success": True, "result": {"address": addr_str, "type": type_name, "message": "Data created"}}, 201) + except Exception as e: + return {"success": False, "error": {"code": "DATA_ERROR", "message": str(e)}} + + def handle_data_delete(self, exchange): + if not self.program: + return self._no_program() + body = parse_json_body(exchange) + addr_str = body.get("address") + if not addr_str: + return {"success": False, "error": {"code": "MISSING_PARAMETER", "message": "Missing 'address'"}} + + try: + addr = self.program.getAddressFactory().getAddress(addr_str) + except: + return {"success": False, "error": {"code": "INVALID_ADDRESS", "message": "Invalid address: %s" % addr_str}} + + def do_delete(): + self.program.getListing().clearCodeUnits(addr, addr, False) + + try: + with_transaction(self.program, "Delete data", do_delete) + return {"success": True, "result": {"address": addr_str, "message": "Data deleted"}} + except Exception as e: + return {"success": False, "error": {"code": "DELETE_ERROR", "message": str(e)}} + + def handle_data_type(self, exchange): + """Change data type at an address (clear + recreate).""" + if not self.program: + return self._no_program() + body = parse_json_body(exchange) + addr_str = body.get("address") + type_name = body.get("type") + if not addr_str or not type_name: + return {"success": False, "error": {"code": "MISSING_PARAMETER", "message": "Missing 'address' and/or 'type'"}} + + try: + addr = self.program.getAddressFactory().getAddress(addr_str) + except: + return {"success": False, "error": {"code": "INVALID_ADDRESS", "message": "Invalid address: %s" % addr_str}} + + dtm = self.program.getDataTypeManager() + dt = resolve_data_type(dtm, type_name) + if not dt: + return {"success": False, "error": {"code": "UNKNOWN_TYPE", "message": "Cannot resolve type: %s" % type_name}} + + def do_retype(): + from ghidra.program.model.data import DataUtilities + self.program.getListing().clearCodeUnits(addr, addr, False) + DataUtilities.createData( + self.program, addr, dt, -1, False, + DataUtilities.ClearDataMode.CLEAR_ALL_UNDEFINED_CONFLICT_DATA + ) + + try: + with_transaction(self.program, "Change data type", do_retype) + return {"success": True, "result": {"address": addr_str, "type": type_name, "message": "Data type changed"}} + except Exception as e: + return {"success": False, "error": {"code": "TYPE_ERROR", "message": str(e)}} + + # ================================================================== + # Strings Handler + # ================================================================== + + def handle_strings(self, exchange): + if not self.program: + return self._no_program() + params = parse_query_params(exchange) + limit = parse_int(params.get("limit"), 2000) + offset = parse_int(params.get("offset"), 0) + filter_str = params.get("filter") + min_length = parse_int(params.get("min_length"), 2) strings = [] listing = self.program.getListing() + ref_mgr = self.program.getReferenceManager() + count = 0 + skipped = 0 - # Iterate through all defined data and filter for string types - for data in listing.getDefinedData(True): # True = forward iteration + for data in listing.getDefinedData(True): + if count >= limit: + break try: dt = data.getDataType() if not dt: continue - - # Check if data type is a string variant type_name = dt.getName().lower() - if "string" in type_name or type_name in ("char", "wchar"): - value = data.getValue() - if value: - str_val = str(value) - if len(str_val) > 1: # Skip single chars - strings.append({ - "address": str(data.getAddress()), - "value": str_val[:200], # Truncate long strings - "length": len(str_val) - }) - if len(strings) >= 5000: - break + if "string" not in type_name and type_name not in ("char", "wchar"): + continue + value = data.getValue() + if not value: + continue + str_val = str(value) + if len(str_val) < min_length: + continue + if filter_str and filter_str.lower() not in str_val.lower(): + continue + if skipped < offset: + skipped += 1 + continue + item = { + "address": str(data.getAddress()), + "value": str_val[:200], + "length": len(str_val), + "type": dt.getName(), + } + # Xref count + ref_count = 0 + refs = ref_mgr.getReferencesTo(data.getAddress()) + while refs.hasNext(): + refs.next() + ref_count += 1 + item["xrefCount"] = ref_count + # Label name + sym = self.program.getSymbolTable().getPrimarySymbol(data.getAddress()) + if sym: + item["name"] = sym.getName() + strings.append(item) + count += 1 except: pass return { "success": True, + "result": strings, "size": len(strings), - "result": strings, # MCP client expects "result" key + "offset": offset, + "limit": limit, } - def handle_decompile_by_path(self, path): - """Handle /functions/{address}/decompile or /functions/by-name/{name}/decompile""" + # ================================================================== + # Memory Handlers + # ================================================================== + + def handle_memory_read(self, exchange): if not self.program: - return {"success": False, "error": "No program loaded"} + return self._no_program() + params = parse_query_params(exchange) + addr_str = params.get("address") + if not addr_str: + return {"success": False, "error": {"code": "MISSING_PARAMETER", "message": "Missing 'address' parameter"}} - # Parse address or name from path - # /functions/000496e8/decompile or /functions/by-name/main/decompile - parts = path.split("/") + length = parse_int(params.get("length"), 16) + fmt = params.get("format", "hex") + if length <= 0 or length > 4096: + return {"success": False, "error": {"code": "INVALID_PARAMETER", "message": "Length must be 1-4096"}} - func = None - fm = self.program.getFunctionManager() - - if "by-name" in path and len(parts) >= 4: - # /functions/by-name/{name}/decompile - name = parts[3] - for f in fm.getFunctions(True): - if f.getName() == name: - func = f - break - elif len(parts) >= 3: - # /functions/{address}/decompile - addr_str = parts[2] - try: - addr = self.program.getAddressFactory().getAddress(addr_str) - func = fm.getFunctionAt(addr) - except: - pass - - if not func: - return {"success": False, "error": "Function not found from path: " + path} - - # Decompile try: - result = self.decompiler.decompileFunction(func, 30, getMonitor()) - if result and result.decompileCompleted(): - code = result.getDecompiledFunction().getC() - return { - "success": True, - "result": { - "name": func.getName(), - "address": str(func.getEntryPoint()), - "decompiled_text": code, - "ccode": code, - } - } + addr = self.program.getAddressFactory().getAddress(addr_str) + mem = self.program.getMemory() + from jarray import zeros + byte_array = zeros(length, 'b') + bytes_read = mem.getBytes(addr, byte_array) + + if fmt == "base64": + from java.util import Base64 + # Convert to unsigned bytes for Base64 + import array as pyarray + unsigned = bytearray([(b & 0xff) for b in byte_array[:bytes_read]]) + formatted = Base64.getEncoder().encodeToString(unsigned) + elif fmt == "string": + chars = [] + for i in range(bytes_read): + b = byte_array[i] & 0xff + if 32 <= b < 127: + chars.append(chr(b)) + else: + chars.append('.') + formatted = ''.join(chars) else: - return {"success": False, "error": "Decompilation failed"} + formatted = bytes_to_hex(byte_array[:bytes_read]) + + return { + "success": True, + "result": { + "address": str(addr), + "bytesRead": bytes_read, + "format": fmt, + "bytes": formatted, + "hexBytes": bytes_to_hex(byte_array[:bytes_read]), + "rawBytes": bytes_to_hex(byte_array[:bytes_read]), + } + } except Exception as e: - return {"success": False, "error": str(e)} + return {"success": False, "error": {"code": "MEMORY_ERROR", "message": str(e)}} - def handle_decompile(self, exchange): + def handle_memory_write(self, exchange, addr_str): if not self.program: - return {"success": False, "error": "No program loaded"} + return self._no_program() + body = parse_json_body(exchange) + bytes_str = body.get("bytes") + fmt = body.get("format", "hex") - # Get function name from query params - query = exchange.getRequestURI().getQuery() - if not query: - return {"success": False, "error": "Missing 'name' or 'address' parameter"} + if not bytes_str: + return {"success": False, "error": {"code": "MISSING_PARAMETER", "message": "Missing 'bytes'"}} - params = {} - for part in query.split("&"): - if "=" in part: - k, v = part.split("=", 1) - params[k] = v + try: + addr = self.program.getAddressFactory().getAddress(addr_str) + if fmt == "base64": + from java.util import Base64 + byte_vals = list(Base64.getDecoder().decode(bytes_str)) + elif fmt == "string": + byte_vals = [ord(c) for c in bytes_str] + else: + byte_vals = hex_to_bytes(bytes_str) + # Convert to Java byte array + from jarray import zeros + java_bytes = zeros(len(byte_vals), 'b') + for i, b in enumerate(byte_vals): + java_bytes[i] = b if b <= 127 else b - 256 + + def do_write(): + self.program.getMemory().setBytes(addr, java_bytes) + + with_transaction(self.program, "Write memory", do_write) + return {"success": True, "result": {"address": str(addr), "bytesWritten": len(byte_vals)}} + except Exception as e: + return {"success": False, "error": {"code": "MEMORY_ERROR", "message": str(e)}} + + def handle_memory_blocks(self, exchange): + """List memory blocks (alias for /segments).""" + return self.handle_segments(exchange) + + def handle_set_comment(self, exchange, addr_str, comment_type): + """Set a comment at a specific address.""" + if not self.program: + return self._no_program() + body = parse_json_body(exchange) + comment_text = body.get("comment", "") + + ct = COMMENT_TYPE_MAP.get(comment_type.lower()) + if ct is None: + return {"success": False, "error": { + "code": "INVALID_COMMENT_TYPE", + "message": "Invalid comment type: %s. Use: pre, post, eol, plate, repeatable" % comment_type + }} + + try: + addr = self.program.getAddressFactory().getAddress(addr_str) + listing = self.program.getListing() + cu = listing.getCodeUnitAt(addr) + if not cu: + cu = listing.getCodeUnitContaining(addr) + if not cu: + return {"success": False, "error": {"code": "NO_CODE_UNIT", "message": "No code unit at address: %s" % addr_str}} + + def do_comment(): + cu.setComment(ct, comment_text if comment_text else None) + + with_transaction(self.program, "Set comment", do_comment) + return {"success": True, "result": { + "address": addr_str, + "commentType": comment_type, + "comment": comment_text, + "message": "Comment set successfully", + }} + except Exception as e: + return {"success": False, "error": {"code": "COMMENT_ERROR", "message": str(e)}} + + # ================================================================== + # Segments Handler + # ================================================================== + + def handle_segments(self, exchange): + if not self.program: + return self._no_program() + params = parse_query_params(exchange) + name_filter = params.get("name") + + segments = [] + for block in self.program.getMemory().getBlocks(): + if name_filter and name_filter not in block.getName(): + continue + segments.append({ + "name": block.getName(), + "start": str(block.getStart()), + "end": str(block.getEnd()), + "size": block.getSize(), + "readable": block.isRead(), + "writable": block.isWrite(), + "executable": block.isExecute(), + "initialized": block.isInitialized(), + "_links": { + "self": make_link("/segments/%s" % block.getName()), + "memory": make_link("/memory?address=%s&length=1024" % str(block.getStart())), + }, + }) + + return {"success": True, "result": segments} + + # ================================================================== + # Symbol Handlers + # ================================================================== + + def handle_symbols(self, exchange): + if not self.program: + return self._no_program() + params = parse_query_params(exchange) + limit = parse_int(params.get("limit"), 100) + offset = parse_int(params.get("offset"), 0) + name_filter = params.get("name") + type_filter = params.get("type") + + symbols = [] + st = self.program.getSymbolTable() + count = 0 + skipped = 0 + + for symbol in st.getAllSymbols(True): + if count >= limit: + break + if name_filter and name_filter.lower() not in symbol.getName().lower(): + continue + if type_filter and str(symbol.getSymbolType()).lower() != type_filter.lower(): + continue + if skipped < offset: + skipped += 1 + continue + symbols.append({ + "name": symbol.getName(), + "address": str(symbol.getAddress()), + "namespace": symbol.getParentNamespace().getName(), + "type": str(symbol.getSymbolType()), + "isPrimary": symbol.isPrimary(), + "isExternal": symbol.isExternal(), + }) + count += 1 + + return { + "success": True, + "result": symbols, + "offset": offset, + "limit": limit, + "_links": { + "self": make_link("/symbols"), + "imports": make_link("/symbols/imports"), + "exports": make_link("/symbols/exports"), + }, + } + + def handle_imports(self, exchange): + if not self.program: + return self._no_program() + params = parse_query_params(exchange) + limit = parse_int(params.get("limit"), 100) + offset = parse_int(params.get("offset"), 0) + + imports = [] + count = 0 + skipped = 0 + for symbol in self.program.getSymbolTable().getExternalSymbols(): + if count >= limit: + break + if skipped < offset: + skipped += 1 + continue + imports.append({ + "name": symbol.getName(), + "address": str(symbol.getAddress()), + "namespace": symbol.getParentNamespace().getName(), + }) + count += 1 + + return {"success": True, "result": imports, "offset": offset, "limit": limit} + + def handle_exports(self, exchange): + if not self.program: + return self._no_program() + params = parse_query_params(exchange) + limit = parse_int(params.get("limit"), 100) + offset = parse_int(params.get("offset"), 0) + + exports = [] + count = 0 + skipped = 0 + for symbol in self.program.getSymbolTable().getAllSymbols(True): + if count >= limit: + break + if not symbol.isExternalEntryPoint(): + continue + if skipped < offset: + skipped += 1 + continue + exports.append({ + "name": symbol.getName(), + "address": str(symbol.getAddress()), + }) + count += 1 + + return {"success": True, "result": exports, "offset": offset, "limit": limit} + + # ================================================================== + # Cross-References Handler + # ================================================================== + + def handle_xrefs(self, exchange): + if not self.program: + return self._no_program() + params = parse_query_params(exchange) + to_addr_str = params.get("to_addr") + from_addr_str = params.get("from_addr") + type_filter = params.get("type") + limit = parse_int(params.get("limit"), 100) + offset = parse_int(params.get("offset"), 0) + + if not to_addr_str and not from_addr_str: + return {"success": False, "error": { + "code": "MISSING_PARAMETER", + "message": "Either 'to_addr' or 'from_addr' parameter is required" + }} + + xrefs = [] + ref_mgr = self.program.getReferenceManager() + + try: + if to_addr_str: + addr = self.program.getAddressFactory().getAddress(to_addr_str) + refs = ref_mgr.getReferencesTo(addr) + count = 0 + skipped = 0 + while refs.hasNext() and count < limit: + ref = refs.next() + if type_filter and self._get_ref_type_name(ref.getReferenceType()).lower() != type_filter.lower(): + continue + if skipped < offset: + skipped += 1 + continue + xrefs.append(self._build_xref_info(ref)) + count += 1 + + if from_addr_str: + addr = self.program.getAddressFactory().getAddress(from_addr_str) + refs = ref_mgr.getReferencesFrom(addr) + count = 0 + skipped = 0 + for ref in refs: + if count >= limit: + break + if type_filter and self._get_ref_type_name(ref.getReferenceType()).lower() != type_filter.lower(): + continue + if skipped < offset: + skipped += 1 + continue + xrefs.append(self._build_xref_info(ref)) + count += 1 + + return {"success": True, "result": xrefs, "offset": offset, "limit": limit} + except Exception as e: + return {"success": False, "error": {"code": "XREF_ERROR", "message": str(e)}} + + # ================================================================== + # Classes & Namespaces Handlers + # ================================================================== + + def handle_classes(self, exchange): + if not self.program: + return self._no_program() + params = parse_query_params(exchange) + limit = parse_int(params.get("limit"), 100) + offset = parse_int(params.get("offset"), 0) + + class_names = set() + for symbol in self.program.getSymbolTable().getAllSymbols(True): + ns = symbol.getParentNamespace() + if ns and not ns.isGlobal() and ns.getSymbol().getSymbolType().isNamespace(): + class_names.add(ns.getName(True)) + + sorted_names = sorted(class_names) + start = min(offset, len(sorted_names)) + end = min(offset + limit, len(sorted_names)) + + classes = [] + for name in sorted_names[start:end]: + info = {"name": name} + if "::" in name: + info["namespace"] = name[:name.rfind("::")] + info["simpleName"] = name[name.rfind("::") + 2:] + else: + info["namespace"] = "global" + info["simpleName"] = name + classes.append(info) + + return { + "success": True, + "result": classes, + "size": len(sorted_names), + "offset": offset, + "limit": limit, + } + + def handle_namespaces(self, exchange): + if not self.program: + return self._no_program() + params = parse_query_params(exchange) + limit = parse_int(params.get("limit"), 100) + offset = parse_int(params.get("offset"), 0) + + namespaces = set() + for symbol in self.program.getSymbolTable().getAllSymbols(True): + ns = symbol.getParentNamespace() + if ns and not ns.isGlobal(): + namespaces.add(ns.getName(True)) + + sorted_ns = sorted(namespaces) + start = min(offset, len(sorted_ns)) + end = min(offset + limit, len(sorted_ns)) + + return { + "success": True, + "result": sorted_ns[start:end], + "size": len(sorted_ns), + "offset": offset, + "limit": limit, + } + + # ================================================================== + # Structs Handlers + # ================================================================== + + def handle_structs(self, exchange): + if not self.program: + return self._no_program() + params = parse_query_params(exchange) + + # Detail lookup by name + name_lookup = params.get("name") + if name_lookup: + return self._handle_struct_detail(name_lookup) + + # List structs + limit = parse_int(params.get("limit"), 100) + offset = parse_int(params.get("offset"), 0) + category_filter = params.get("category") + + from ghidra.program.model.data import Structure, Union + + structs = [] + dtm = self.program.getDataTypeManager() + count = 0 + skipped = 0 + + it = dtm.getAllDataTypes() + while it.hasNext() and count < limit: + dt = it.next() + if not isinstance(dt, (Structure, Union)): + continue + if category_filter and category_filter.lower() not in dt.getCategoryPath().getPath().lower(): + continue + if skipped < offset: + skipped += 1 + continue + structs.append({ + "name": dt.getName(), + "category": dt.getCategoryPath().getPath(), + "path": dt.getPathName(), + "size": dt.getLength(), + "type": "struct" if isinstance(dt, Structure) else "union", + "numFields": dt.getNumComponents(), + "_links": {"self": make_link("/structs?name=%s" % dt.getName())}, + }) + count += 1 + + return {"success": True, "result": structs, "offset": offset, "limit": limit} + + def _handle_struct_detail(self, name): + from ghidra.program.model.data import Composite + dt = self._find_struct(name) + if not dt: + return {"success": False, "error": {"code": "STRUCT_NOT_FOUND", "message": "Struct not found: %s" % name}} + + result = { + "name": dt.getName(), + "category": dt.getCategoryPath().getPath(), + "path": dt.getPathName(), + "size": dt.getLength(), + "description": dt.getDescription() or "", + } + + if isinstance(dt, Composite): + fields = [] + for comp in dt.getComponents(): + field = { + "name": comp.getFieldName(), + "type": comp.getDataType().getName(), + "typePath": comp.getDataType().getPathName(), + "offset": comp.getOffset(), + "length": comp.getLength(), + } + if comp.getComment(): + field["comment"] = comp.getComment() + fields.append(field) + result["fields"] = fields + result["numFields"] = len(fields) + + result["_links"] = { + "self": make_link("/structs?name=%s" % name), + "structs": make_link("/structs"), + } + return {"success": True, "result": result} + + def handle_struct_create(self, exchange): + if not self.program: + return self._no_program() + body = parse_json_body(exchange) + name = body.get("name") + if not name: + return {"success": False, "error": {"code": "MISSING_PARAMETER", "message": "Missing 'name'"}} + + category = body.get("category", "/") + description = body.get("description", "") + + try: + from ghidra.program.model.data import StructureDataType, CategoryPath, DataTypeConflictHandler + cat_path = CategoryPath(category) + new_struct = StructureDataType(cat_path, name, 0) + if description: + new_struct.setDescription(description) + + result_holder = [None] + + def do_create(): + dtm = self.program.getDataTypeManager() + result_holder[0] = dtm.addDataType(new_struct, DataTypeConflictHandler.DEFAULT_HANDLER) + + with_transaction(self.program, "Create struct", do_create) + dt = result_holder[0] + return ({"success": True, "result": { + "name": dt.getName() if dt else name, + "path": dt.getPathName() if dt else "%s/%s" % (category, name), + "category": category, + "size": 0, + "message": "Struct created successfully", + }}, 201) + except Exception as e: + return {"success": False, "error": {"code": "STRUCT_ERROR", "message": str(e)}} + + def handle_struct_addfield(self, exchange): + if not self.program: + return self._no_program() + body = parse_json_body(exchange) + struct_name = body.get("struct") + field_name = body.get("fieldName") + field_type = body.get("fieldType") + + if not struct_name or not field_name or not field_type: + return {"success": False, "error": {"code": "MISSING_PARAMETER", "message": "Missing 'struct', 'fieldName', or 'fieldType'"}} + + struct = self._find_struct(struct_name) + if not struct: + return {"success": False, "error": {"code": "STRUCT_NOT_FOUND", "message": "Struct not found: %s" % struct_name}} + + dtm = self.program.getDataTypeManager() + dt = resolve_data_type(dtm, field_type) + if not dt: + return {"success": False, "error": {"code": "UNKNOWN_TYPE", "message": "Cannot resolve type: %s" % field_type}} + + field_offset = body.get("offset") + comment = body.get("comment") + result_info = {} + + def do_add(): + if field_offset is not None: + struct.insertAtOffset(int(field_offset), dt, dt.getLength(), field_name, comment) + result_info["offset"] = int(field_offset) + else: + struct.add(dt, dt.getLength(), field_name, comment) + result_info["offset"] = struct.getLength() - dt.getLength() + result_info["length"] = dt.getLength() + result_info["structSize"] = struct.getLength() + + try: + with_transaction(self.program, "Add struct field", do_add) + return {"success": True, "result": { + "struct": struct_name, + "fieldName": field_name, + "fieldType": field_type, + "offset": result_info.get("offset", 0), + "length": result_info.get("length", 0), + "structSize": result_info.get("structSize", 0), + "message": "Field added successfully", + }} + except Exception as e: + return {"success": False, "error": {"code": "FIELD_ERROR", "message": str(e)}} + + def handle_struct_updatefield(self, exchange): + if not self.program: + return self._no_program() + body = parse_json_body(exchange) + struct_name = body.get("struct") + if not struct_name: + return {"success": False, "error": {"code": "MISSING_PARAMETER", "message": "Missing 'struct'"}} + + struct = self._find_struct(struct_name) + if not struct: + return {"success": False, "error": {"code": "STRUCT_NOT_FOUND", "message": "Struct not found: %s" % struct_name}} + + field_name = body.get("fieldName") + field_offset = body.get("fieldOffset") + new_name = body.get("newName") + new_type = body.get("newType") + new_comment = body.get("newComment") + + if not new_name and not new_type and new_comment is None: + return {"success": False, "error": {"code": "MISSING_PARAMETER", "message": "Provide at least one of: newName, newType, newComment"}} + + # Find the component + component = None + if field_offset is not None: + component = struct.getComponentAt(int(field_offset)) + elif field_name: + for comp in struct.getComponents(): + if comp.getFieldName() == field_name: + component = comp + break + + if not component: + return {"success": False, "error": {"code": "FIELD_NOT_FOUND", "message": "Field not found"}} + + orig_name = component.getFieldName() + orig_type = component.getDataType().getName() + orig_comment = component.getComment() + + def do_update(): + if new_name: + component.setFieldName(new_name) + if new_comment is not None: + component.setComment(new_comment) + if new_type: + dtm = self.program.getDataTypeManager() + dt = resolve_data_type(dtm, new_type) + if dt: + component.setDataType(dt) + + try: + with_transaction(self.program, "Update struct field", do_update) + return {"success": True, "result": { + "struct": struct_name, + "offset": component.getOffset(), + "originalName": orig_name, + "originalType": orig_type, + "originalComment": orig_comment, + "newName": new_name or orig_name, + "newType": new_type or orig_type, + "newComment": new_comment if new_comment is not None else orig_comment, + "length": component.getLength(), + "message": "Field updated successfully", + }} + except Exception as e: + return {"success": False, "error": {"code": "UPDATE_ERROR", "message": str(e)}} + + def handle_struct_delete(self, exchange): + if not self.program: + return self._no_program() + body = parse_json_body(exchange) + name = body.get("name") + if not name: + return {"success": False, "error": {"code": "MISSING_PARAMETER", "message": "Missing 'name'"}} + + dt = self._find_struct(name) + if not dt: + return {"success": False, "error": {"code": "STRUCT_NOT_FOUND", "message": "Struct not found: %s" % name}} + + category = dt.getCategoryPath().getPath() + path = dt.getPathName() + + def do_delete(): + dtm = self.program.getDataTypeManager() + dtm.remove(dt, getMonitor()) + + try: + with_transaction(self.program, "Delete struct", do_delete) + return {"success": True, "result": { + "name": name, + "path": path, + "category": category, + "message": "Struct deleted successfully", + }} + except Exception as e: + return {"success": False, "error": {"code": "DELETE_ERROR", "message": str(e)}} + + # ================================================================== + # Analysis Handlers + # ================================================================== + + def handle_analysis_info(self, exchange): + if not self.program: + return self._no_program() + result = { + "program": self.program.getName(), + "processor": str(self.program.getLanguage().getProcessor()), + "addressSize": self.program.getAddressFactory().getDefaultAddressSpace().getSize(), + "programLanguage": str(self.program.getLanguage()), + "availableAnalysis": ["callgraph", "xrefs", "decompile", "dataflow"], + "_links": { + "self": make_link("/analysis"), + "callgraph": make_link("/analysis/callgraph"), + "dataflow": make_link("/analysis/dataflow"), + "program": make_link("/program"), + }, + } + return {"success": True, "result": result} + + def handle_analysis_run(self, exchange): + if not self.program: + return self._no_program() + try: + # In headless mode, try to re-run auto-analysis + from ghidra.app.script import GhidraScriptUtil + analyzeAll(self.program) + return {"success": True, "result": { + "program": self.program.getName(), + "analysis_triggered": True, + "message": "Analysis initiated on program", + }} + except Exception as e: + # analyzeAll may not be available; graceful fallback + return {"success": True, "result": { + "program": self.program.getName(), + "analysis_triggered": False, + "message": "Analysis request received (headless mode: %s)" % str(e), + }} + + def handle_callgraph(self, exchange): + if not self.program: + return self._no_program() + params = parse_query_params(exchange) + func_name = params.get("name") + func_addr = params.get("address") + max_depth = parse_int(params.get("max_depth"), 3) + + start_func = None + if func_addr: + start_func = self._find_function_at(func_addr) + elif func_name: + start_func = self._find_function_by_name(func_name) + else: + # Use first entry point + entry_iter = self.program.getSymbolTable().getExternalEntryPointIterator() + if entry_iter.hasNext(): + start_func = self.program.getFunctionManager().getFunctionAt(entry_iter.next()) + + if not start_func: + return {"success": False, "error": {"code": "FUNCTION_NOT_FOUND", "message": "Function not found for callgraph"}} + + nodes = [] + edges = [] + visited = set() + + self._build_callgraph(start_func, nodes, edges, visited, 0, max_depth) + + graph = { + "root": start_func.getName(), + "rootAddress": str(start_func.getEntryPoint()), + "maxDepth": max_depth, + "nodes": nodes, + "edges": edges, + "nodeCount": len(nodes), + "edgeCount": len(edges), + } + return {"success": True, "result": graph} + + def _build_callgraph(self, func, nodes, edges, visited, depth, max_depth): + func_id = str(func.getEntryPoint()) + if func_id in visited: + return + visited.add(func_id) + + nodes.append({ + "id": func_id, + "name": func.getName(), + "address": func_id, + "depth": depth, + }) + + if depth >= max_depth: + return + + ref_mgr = self.program.getReferenceManager() + body = func.getBody() + addr_iter = body.getAddresses(True) + + while addr_iter.hasNext(): + addr = addr_iter.next() + refs = ref_mgr.getReferencesFrom(addr) + for ref in refs: + if ref.getReferenceType().isCall(): + called = self.program.getFunctionManager().getFunctionAt(ref.getToAddress()) + if called: + edges.append({ + "from": func_id, + "to": str(called.getEntryPoint()), + "type": "call", + "callSite": str(addr), + }) + self._build_callgraph(called, nodes, edges, visited, depth + 1, max_depth) + + def handle_dataflow(self, exchange): + if not self.program: + return self._no_program() + params = parse_query_params(exchange) + addr_str = params.get("address") + if not addr_str: + return {"success": False, "error": {"code": "MISSING_PARAMETER", "message": "Missing 'address' parameter"}} + + direction = params.get("direction", "forward") + max_steps = parse_int(params.get("max_steps"), 50) + + try: + addr = self.program.getAddressFactory().getAddress(addr_str) + func = self.program.getFunctionManager().getFunctionContaining(addr) + if not func: + return {"success": False, "error": {"code": "FUNCTION_NOT_FOUND", "message": "No function containing address: %s" % addr_str}} + + # Simplified dataflow: walk instructions from the starting address + listing = self.program.getListing() + steps = [] + current = addr + step_count = 0 + + if direction == "forward": + inst_iter = listing.getInstructions(addr, True) + else: + inst_iter = listing.getInstructions(addr, False) + + while inst_iter.hasNext() and step_count < max_steps: + instr = inst_iter.next() + # Stay within function body + if not func.getBody().contains(instr.getAddress()): + break + full_repr = str(instr) + steps.append({ + "address": str(instr.getAddress()), + "instruction": full_repr, + "mnemonic": instr.getMnemonicString(), + "bytes": bytes_to_hex(instr.getBytes()), + }) + step_count += 1 + + return { + "success": True, + "result": { + "start_address": addr_str, + "function": func.getName(), + "direction": direction, + "max_steps": max_steps, + "total_steps": len(steps), + "steps": steps, + "sources": [], + "sinks": [], + } + } + except Exception as e: + return {"success": False, "error": {"code": "DATAFLOW_ERROR", "message": str(e)}} + + # ================================================================== + # Legacy Compatibility + # ================================================================== + + def handle_decompile_legacy(self, exchange): + """Handle GET /decompile?name=X or ?address=X (backwards compat).""" + if not self.program: + return self._no_program() + params = parse_query_params(exchange) name = params.get("name") or params.get("address") if not name: - return {"success": False, "error": "Missing 'name' or 'address' parameter"} - - # Find function - fm = self.program.getFunctionManager() - func = None - for f in fm.getFunctions(True): - if f.getName() == name: - func = f - break + return {"success": False, "error": {"code": "MISSING_PARAMETER", "message": "Missing 'name' or 'address' parameter"}} + # Try by name first, then by address + func = self._find_function_by_name(name) if not func: - try: - addr = self.program.getAddressFactory().getAddress(name) - func = fm.getFunctionAt(addr) - except: - pass - + func = self._find_function_at(name) if not func: - return {"success": False, "error": "Function not found: " + name} + return {"success": False, "error": {"code": "FUNCTION_NOT_FOUND", "message": "Function not found: %s" % name}} - # Decompile - try: - result = self.decompiler.decompileFunction(func, 30, getMonitor()) - if result and result.decompileCompleted(): - code = result.getDecompiledFunction().getC() - return { - "success": True, - "name": func.getName(), - "address": str(func.getEntryPoint()), - "decompiled": code - } - else: - return {"success": False, "error": "Decompilation failed"} - except Exception as e: - return {"success": False, "error": str(e)} + result = self.decompiler.decompileFunction(func, 30, getMonitor()) + if result and result.decompileCompleted(): + code = result.getDecompiledFunction().getC() + return { + "success": True, + "name": func.getName(), + "address": str(func.getEntryPoint()), + "decompiled": code, + } + return {"success": False, "error": {"code": "DECOMPILE_FAILED", "message": "Decompilation failed"}} +# ======================================================================== +# Server Startup +# ======================================================================== + def run_server(port, program, decompiler): - """Start the HTTP server""" + """Start the HTTP server with a single catch-all handler.""" server = HttpServer.create(InetSocketAddress(port), 0) server.createContext("/", GhydraMCPHandler(program, decompiler)) server.setExecutor(Executors.newCachedThreadPool()) server.start() - println("GhydraMCP HTTP server started on port " + str(port)) + println("[GhydraMCP] HTTP server started on port %d" % port) return server -# Main script execution +# ======================================================================== +# Main Entry Point +# ======================================================================== + def main(): port = DEFAULT_PORT @@ -313,7 +2103,7 @@ def main(): try: port = int(args[0]) except: - println("Invalid port number, using default: " + str(DEFAULT_PORT)) + println("Invalid port number, using default: %d" % DEFAULT_PORT) # Initialize decompiler decompiler = DecompInterface() @@ -322,27 +2112,27 @@ def main(): println("=========================================") println(" GhydraMCP Headless HTTP Server") println("=========================================") - println(" API Version: " + API_VERSION_STRING + " (compat: " + str(API_VERSION) + ")") - println(" Port: " + str(port)) - println(" Program: " + (currentProgram.getName() if currentProgram else "None")) - println(" Script: Python/Jython") + println(" API Version: %s (compat: %d)" % (API_VERSION_STRING, API_VERSION)) + println(" Port: %d" % port) + println(" Program: %s" % (currentProgram.getName() if currentProgram else "None")) + println(" Script: Python/Jython (Full API)") + println(" Routes: %d" % len(ROUTES)) println("=========================================") server = run_server(port, currentProgram, decompiler) println("") println("GhydraMCP Server running. Press Ctrl+C to stop.") - println("API available at: http://localhost:" + str(port) + "/") + println("API available at: http://localhost:%d/" % port) # Keep the script running - import time try: while True: time.sleep(1) except KeyboardInterrupt: server.stop(0) - println("Server stopped.") + println("[GhydraMCP] Server stopped.") -# Run the main function +# Run main() diff --git a/docker/README.md b/docker/README.md new file mode 100644 index 0000000..aff637f --- /dev/null +++ b/docker/README.md @@ -0,0 +1,197 @@ +# GhydraMCP Docker Setup + +This directory contains Docker configuration for running GhydraMCP in headless mode. + +## Quick Start + +```bash +# Build the image +docker build -t ghydramcp:latest -f docker/Dockerfile . + +# Analyze a binary +docker run -p 8192:8192 -v /path/to/binaries:/binaries ghydramcp /binaries/sample.exe + +# Check API health +curl http://localhost:8192/ +``` + +## Architecture + +The Docker container includes: + +1. **Ghidra 11.4.2** - Full headless installation +2. **GhydraMCP Extension** - The Java plugin (installed in Extensions/) +3. **GhydraMCPServer.py** - Headless HTTP server (Jython, full API parity) + +### Why Two HTTP Servers? + +The GhydraMCP plugin (`GhydraMCPPlugin.java`) is a full Ghidra GUI plugin that requires: +- Ghidra's `PluginTool` framework +- `ProgramManager` service for program access +- GUI event handling + +These GUI services don't exist in headless mode. Instead, the container uses `GhydraMCPServer.py`, a Jython script that: +- Runs via `analyzeHeadless -postScript` +- Has direct access to `currentProgram` from the script context +- Provides **full API parity** with the GUI plugin (45 routes) +- Supports all read and write operations + +### Available Endpoints (Headless Mode) + +The headless server implements the complete GhydraMCP HTTP API: + +| Category | Endpoints | Description | +|----------|-----------|-------------| +| **Info** | `GET /`, `/info`, `/program` | API info, program metadata | +| **Functions** | `GET /functions`, `/functions/{addr}`, `/functions/by-name/{name}` | List and detail | +| **Decompile** | `GET /functions/{addr}/decompile`, `/functions/by-name/{name}/decompile` | C pseudocode | +| **Disassembly** | `GET /functions/{addr}/disassembly`, `/functions/by-name/{name}/disassembly` | Assembly listing | +| **Data** | `GET /data`, `/strings` | Defined data and strings | +| **Memory** | `GET /memory`, `/memory/blocks` | Read bytes, list segments | +| **Xrefs** | `GET /xrefs` | Cross-references (to/from) | +| **Structs** | `GET /structs` | Data type structures | +| **Symbols** | `GET /symbols`, `/imports`, `/exports` | Symbol tables | +| **Analysis** | `GET /analysis/callgraph`, `/analysis/dataflow` | Static analysis | +| **Write Ops** | `PATCH /functions/*`, `POST /data`, `POST /structs/*` | Rename, annotate, create | + +See [GHIDRA_HTTP_API.md](../GHIDRA_HTTP_API.md) for the complete API specification. + +## Container Modes + +### Headless Mode (Default) + +Imports a binary, analyzes it, and starts the HTTP API server: + +```bash +docker run -p 8192:8192 \ + -v ./samples:/binaries \ + ghydramcp /binaries/sample.exe +``` + +### Server Mode + +Opens an existing project and program: + +```bash +docker run -p 8192:8192 \ + -e GHYDRA_MODE=server \ + -v ./projects:/projects \ + ghydramcp program_name +``` + +### Analyze Mode + +Imports and analyzes without starting HTTP server: + +```bash +docker run \ + -e GHYDRA_MODE=analyze \ + -v ./samples:/binaries \ + -v ./projects:/projects \ + ghydramcp /binaries/sample.exe +``` + +### Shell Mode + +Interactive debugging: + +```bash +docker run -it \ + -e GHYDRA_MODE=shell \ + ghydramcp +``` + +## Environment Variables + +| Variable | Default | Description | +|----------|---------|-------------| +| `GHYDRA_MODE` | `headless` | Container mode (headless, server, analyze, shell) | +| `GHYDRA_PORT` | `8192` | HTTP API port | +| `GHYDRA_MAXMEM` | `2G` | JVM heap memory | +| `PROJECT_DIR` | `/projects` | Ghidra project directory | +| `PROJECT_NAME` | `GhydraMCP` | Ghidra project name | + +## Docker Compose + +Use docker-compose for easier management: + +```bash +# Development mode (hot-reload scripts) +docker compose --profile dev up ghydramcp-dev + +# Production mode +docker compose --profile prod up ghydramcp + +# Interactive shell +docker compose --profile debug run --rm ghydramcp-shell +``` + +## MCP Integration + +The GhydraMCP Python server includes Docker management tools: + +```python +# Check Docker status +await docker_status() + +# Start container for a binary +await docker_start(binary_path="/path/to/binary.exe", port=8192) + +# Wait for container to be ready +await docker_wait(port=8192, timeout=300) + +# Automatic mode - starts container if no Ghidra available +await docker_auto_start(binary_path="/path/to/binary.exe") + +# Get container logs +await docker_logs("ghydramcp-server") + +# Stop container +await docker_stop("ghydramcp-server") +``` + +## Building + +```bash +# Using Make +make build + +# Using Docker directly +docker build -t ghydramcp:latest -f docker/Dockerfile . + +# Build with specific Ghidra version +docker build -t ghydramcp:latest \ + --build-arg GHIDRA_VERSION=11.4.2 \ + --build-arg GHIDRA_DATE=20250826 \ + -f docker/Dockerfile . +``` + +## Troubleshooting + +### Container starts but API doesn't respond + +Analysis takes time. Monitor progress with: +```bash +docker logs -f ghydramcp-server +``` + +### Port already in use + +Stop existing containers: +```bash +docker stop $(docker ps -q --filter "name=ghydramcp") +``` + +### Memory issues with large binaries + +Increase JVM heap: +```bash +docker run -e GHYDRA_MAXMEM=4G -p 8192:8192 ghydramcp /binaries/large.exe +``` + +### Permission denied on volumes + +The container runs as user `ghidra` (UID 1001). Ensure volume permissions: +```bash +sudo chown -R 1001:1001 /path/to/binaries +```