diff --git a/docker/GhydraMCPServer.py b/docker/GhydraMCPServer.py index d44a800..85e4456 100644 --- a/docker/GhydraMCPServer.py +++ b/docker/GhydraMCPServer.py @@ -305,6 +305,7 @@ ROUTES = [ ("GET", r"^/strings$", "handle_strings"), # Memory + ("GET", r"^/memory/([^/]+)/comments/([^/]+)$", "handle_get_comment"), ("POST", r"^/memory/([^/]+)/comments/([^/]+)$", "handle_set_comment"), ("GET", r"^/memory/blocks$", "handle_memory_blocks"), ("GET", r"^/memory$", "handle_memory_read"), @@ -316,8 +317,26 @@ ROUTES = [ # Symbols ("GET", r"^/symbols/imports$", "handle_imports"), ("GET", r"^/symbols/exports$", "handle_exports"), + ("POST", r"^/symbols$", "handle_symbol_create"), + ("PATCH", r"^/symbols/([^/]+)$", "handle_symbol_rename"), + ("DELETE", r"^/symbols/([^/]+)$", "handle_symbol_delete"), ("GET", r"^/symbols$", "handle_symbols"), + # Variables + ("PATCH", r"^/functions/([^/]+)/variables/([^/]+)$", "handle_variable_rename"), + ("GET", r"^/variables$", "handle_variables"), + + # Bookmarks + ("POST", r"^/bookmarks$", "handle_bookmark_create"), + ("DELETE", r"^/bookmarks/([^/]+)$", "handle_bookmark_delete"), + ("GET", r"^/bookmarks$", "handle_bookmarks"), + + # Data types + ("POST", r"^/datatypes/enums$", "handle_enum_create"), + ("GET", r"^/datatypes/enums$", "handle_enums"), + ("POST", r"^/datatypes/typedefs$", "handle_typedef_create"), + ("GET", r"^/datatypes/typedefs$", "handle_typedefs"), + # Cross-references ("GET", r"^/xrefs$", "handle_xrefs"), @@ -734,26 +753,62 @@ class GhydraMCPHandler(HttpHandler): limit = parse_int(params.get("limit"), 100) offset = parse_int(params.get("offset"), 0) name_filter = params.get("name") + name_contains = params.get("name_contains") + name_regex = params.get("name_matches_regex") + addr_filter = params.get("addr") grep_pattern = compile_grep(params) - functions = [] + # Compile name regex if provided + name_regex_pat = None + if name_regex: + try: + name_regex_pat = re.compile(name_regex, re.IGNORECASE) + except: + return {"success": False, "error": {"code": "INVALID_REGEX", "message": "Invalid regex: %s" % name_regex}} + fm = self.program.getFunctionManager() total = fm.getFunctionCount() + + # Single function lookup by address + if addr_filter: + func = self._find_function_at(addr_filter) + if not func: + return {"success": True, "result": [], "size": 0, "offset": 0, "limit": limit} + addr = str(func.getEntryPoint()) + return {"success": True, "result": [{ + "name": func.getName(), + "address": addr, + "signature": str(func.getSignature()), + "parameterCount": func.getParameterCount(), + "isThunk": func.isThunk(), + "_links": { + "self": make_link("/functions/%s" % addr), + "decompile": make_link("/functions/%s/decompile" % addr), + "disassembly": make_link("/functions/%s/disassembly" % addr), + }, + }], "size": 1, "offset": 0, "limit": limit} + + functions = [] count = 0 skipped = 0 for func in fm.getFunctions(True): if count >= limit: break - # Apply name filter - if name_filter and name_filter.lower() not in func.getName().lower(): + func_name = func.getName() + # Apply name filters + if name_filter and name_filter.lower() not in func_name.lower(): + continue + if name_contains and name_contains.lower() not in func_name.lower(): + continue + if name_regex_pat and not name_regex_pat.search(func_name): continue if skipped < offset: skipped += 1 continue addr = str(func.getEntryPoint()) item = { - "name": func.getName(), + "name": func_name, "address": addr, "signature": str(func.getSignature()), "parameterCount": func.getParameterCount(), @@ -1358,6 +1413,36 @@ class GhydraMCPHandler(HttpHandler): """List memory blocks (alias for /segments).""" return self.handle_segments(exchange) + def handle_get_comment(self, exchange, addr_str, comment_type): + """Get a comment at a specific address.""" + if not self.program: + return self._no_program() + + ct = COMMENT_TYPE_MAP.get(comment_type.lower()) + if ct is None: + return {"success": False, "error": { + "code": "INVALID_COMMENT_TYPE", + "message": "Invalid comment type: %s. Use: pre, post, eol, plate, repeatable" % comment_type + }} + + try: + addr = self.program.getAddressFactory().getAddress(addr_str) + listing = self.program.getListing() + cu = listing.getCodeUnitAt(addr) + if not cu: + cu = listing.getCodeUnitContaining(addr) + if not cu: + return {"success": True, "result": {"address": addr_str, "commentType": comment_type, "comment": None}} + + comment = cu.getComment(ct) + return {"success": True, "result": { + "address": addr_str, + "commentType": comment_type, + "comment": comment, + }} + except Exception as e: + return {"success": False, "error": {"code": "COMMENT_ERROR", "message": str(e)}} + def handle_set_comment(self, exchange, addr_str, comment_type): """Set a comment at a specific address.""" if not self.program: @@ -2112,6 +2197,518 @@ class GhydraMCPHandler(HttpHandler): except Exception as e: return {"success": False, "error": {"code": "DATAFLOW_ERROR", "message": str(e)}} + # ================================================================== + # Symbol CRUD Handlers + # ================================================================== + + def handle_symbol_create(self, exchange): + """POST /symbols - Create a new label/symbol.""" + if not self.program: + return self._no_program() + body = parse_json_body(exchange) + name = body.get("name", "") + address = body.get("address", "") + if not name or not address: + return {"success": False, "error": {"code": "MISSING_PARAMETER", "message": "Both 'name' and 'address' are required"}} + + try: + addr = self.program.getAddressFactory().getAddress(address) + st = self.program.getSymbolTable() + + def do_create(): + st.createLabel(addr, name, SourceType.USER_DEFINED) + + with_transaction(self.program, "Create symbol", do_create) + return {"success": True, "result": { + "name": name, + "address": address, + "message": "Symbol created successfully", + }} + except Exception as e: + return {"success": False, "error": {"code": "SYMBOL_ERROR", "message": str(e)}} + + def handle_symbol_rename(self, exchange, addr_str): + """PATCH /symbols/{address} - Rename primary symbol at address.""" + if not self.program: + return self._no_program() + body = parse_json_body(exchange) + new_name = body.get("name", "") + if not new_name: + return {"success": False, "error": {"code": "MISSING_PARAMETER", "message": "'name' parameter is required"}} + + try: + addr = self.program.getAddressFactory().getAddress(addr_str) + st = self.program.getSymbolTable() + symbol = st.getPrimarySymbol(addr) + if not symbol: + return {"success": False, "error": {"code": "NOT_FOUND", "message": "No symbol at address: %s" % addr_str}} + + old_name = symbol.getName() + + def do_rename(): + symbol.setName(new_name, SourceType.USER_DEFINED) + + with_transaction(self.program, "Rename symbol", do_rename) + return {"success": True, "result": { + "address": addr_str, + "oldName": old_name, + "newName": new_name, + "message": "Symbol renamed successfully", + }} + except Exception as e: + return {"success": False, "error": {"code": "SYMBOL_ERROR", "message": str(e)}} + + def handle_symbol_delete(self, exchange, addr_str): + """DELETE /symbols/{address} - Delete primary symbol at address.""" + if not self.program: + return self._no_program() + + try: + addr = self.program.getAddressFactory().getAddress(addr_str) + st = self.program.getSymbolTable() + symbol = st.getPrimarySymbol(addr) + if not symbol: + return {"success": False, "error": {"code": "NOT_FOUND", "message": "No symbol at address: %s" % addr_str}} + + name = symbol.getName() + + def do_delete(): + symbol.delete() + + with_transaction(self.program, "Delete symbol", do_delete) + return {"success": True, "result": { + "address": addr_str, + "name": name, + "message": "Symbol deleted successfully", + }} + except Exception as e: + return {"success": False, "error": {"code": "SYMBOL_ERROR", "message": str(e)}} + + # ================================================================== + # Variable Rename Handler + # ================================================================== + + def handle_variable_rename(self, exchange, addr_str, var_name): + """PATCH /functions/{address}/variables/{name} - Rename/retype a variable.""" + if not self.program: + return self._no_program() + body = parse_json_body(exchange) + new_name = body.get("name", "") + new_type = body.get("data_type") + + if not new_name: + return {"success": False, "error": {"code": "MISSING_PARAMETER", "message": "'name' parameter is required"}} + + try: + # URL-decode the variable name + from java.net import URLDecoder + decoded_name = URLDecoder.decode(var_name, "UTF-8") + + func = self._find_function_at(addr_str) + if not func: + return {"success": False, "error": {"code": "FUNCTION_NOT_FOUND", "message": "No function at address: %s" % addr_str}} + + # Search parameters and local variables + target_var = None + for param in func.getParameters(): + if param.getName() == decoded_name: + target_var = param + break + if not target_var: + for var in func.getAllVariables(): + if var.getName() == decoded_name: + target_var = var + break + + if not target_var: + return {"success": False, "error": {"code": "NOT_FOUND", "message": "Variable '%s' not found in function" % decoded_name}} + + old_name = target_var.getName() + + def do_rename(): + target_var.setName(new_name, SourceType.USER_DEFINED) + if new_type: + dtm = self.program.getDataTypeManager() + dt = resolve_data_type(dtm, new_type) + if dt: + target_var.setDataType(dt, SourceType.USER_DEFINED) + + with_transaction(self.program, "Rename variable", do_rename) + return {"success": True, "result": { + "function": addr_str, + "oldName": old_name, + "newName": new_name, + "message": "Variable renamed successfully", + }} + except Exception as e: + return {"success": False, "error": {"code": "VARIABLE_ERROR", "message": str(e)}} + + # ================================================================== + # Variables Handler + # ================================================================== + + def handle_variables(self, exchange): + """GET /variables - List global and function variables.""" + if not self.program: + return self._no_program() + params = parse_query_params(exchange) + limit = parse_int(params.get("limit"), 100) + offset = parse_int(params.get("offset"), 0) + global_only = params.get("global_only", "false").lower() == "true" + search = params.get("search", "") + grep_pattern = compile_grep(params) + + variables = [] + count = 0 + skipped = 0 + + # Function variables (parameters + locals) + if not global_only: + fm = self.program.getFunctionManager() + for func in fm.getFunctions(True): + if count >= limit: + break + func_name = func.getName() + func_addr = str(func.getEntryPoint()) + + for param in func.getParameters(): + if count >= limit: + break + var_name = param.getName() + if search and search.lower() not in var_name.lower(): + continue + if skipped < offset: + skipped += 1 + continue + item = { + "name": var_name, + "type": str(param.getDataType()), + "storage": str(param.getVariableStorage()), + "scope": "parameter", + "function": func_name, + "functionAddress": func_addr, + } + if not grep_matches_item(item, grep_pattern): + continue + variables.append(item) + count += 1 + + for var in func.getLocalVariables(): + if count >= limit: + break + var_name = var.getName() + if search and search.lower() not in var_name.lower(): + continue + if skipped < offset: + skipped += 1 + continue + item = { + "name": var_name, + "type": str(var.getDataType()), + "storage": str(var.getVariableStorage()), + "scope": "local", + "function": func_name, + "functionAddress": func_addr, + } + if not grep_matches_item(item, grep_pattern): + continue + variables.append(item) + count += 1 + + # Global variables (defined data with symbol names) + listing = self.program.getListing() + st = self.program.getSymbolTable() + for data in listing.getDefinedData(True): + if count >= limit: + break + addr = data.getAddress() + symbol = st.getPrimarySymbol(addr) + if not symbol: + continue + sym_name = symbol.getName() + # Skip auto-generated names + if sym_name.startswith("DAT_") or sym_name.startswith("s_"): + continue + if search and search.lower() not in sym_name.lower(): + continue + if skipped < offset: + skipped += 1 + continue + item = { + "name": sym_name, + "address": str(addr), + "type": str(data.getDataType()), + "scope": "global", + "size": data.getLength(), + } + if not grep_matches_item(item, grep_pattern): + continue + variables.append(item) + count += 1 + + return {"success": True, "result": variables, "offset": offset, "limit": limit} + + # ================================================================== + # Bookmarks Handlers + # ================================================================== + + def handle_bookmarks(self, exchange): + """GET /bookmarks - List bookmarks with optional filtering.""" + if not self.program: + return self._no_program() + params = parse_query_params(exchange) + limit = parse_int(params.get("limit"), 100) + offset = parse_int(params.get("offset"), 0) + type_filter = params.get("type") + category_filter = params.get("category") + grep_pattern = compile_grep(params) + + bm = self.program.getBookmarkManager() + bookmarks = [] + count = 0 + skipped = 0 + + # Get bookmark types to iterate + if type_filter: + bm_types = [type_filter] + else: + bm_types = [str(t) for t in bm.getBookmarkTypes()] + + for btype in bm_types: + if count >= limit: + break + try: + it = bm.getBookmarksIterator(btype) + except: + continue + while it.hasNext() and count < limit: + bookmark = it.next() + if category_filter and bookmark.getCategory() != category_filter: + continue + if skipped < offset: + skipped += 1 + continue + item = { + "address": str(bookmark.getAddress()), + "type": bookmark.getTypeString(), + "category": bookmark.getCategory(), + "comment": bookmark.getComment(), + } + if not grep_matches_item(item, grep_pattern): + continue + bookmarks.append(item) + count += 1 + + return {"success": True, "result": bookmarks, "offset": offset, "limit": limit} + + def handle_bookmark_create(self, exchange): + """POST /bookmarks - Create a bookmark.""" + if not self.program: + return self._no_program() + body = parse_json_body(exchange) + address = body.get("address", "") + btype = body.get("type", "Note") + category = body.get("category", "") + comment = body.get("comment", "") + + if not address: + return {"success": False, "error": {"code": "MISSING_PARAMETER", "message": "'address' is required"}} + + try: + addr = self.program.getAddressFactory().getAddress(address) + bm = self.program.getBookmarkManager() + + def do_create(): + bm.setBookmark(addr, btype, category, comment) + + with_transaction(self.program, "Create bookmark", do_create) + return {"success": True, "result": { + "address": address, + "type": btype, + "category": category, + "comment": comment, + "message": "Bookmark created successfully", + }} + except Exception as e: + return {"success": False, "error": {"code": "BOOKMARK_ERROR", "message": str(e)}} + + def handle_bookmark_delete(self, exchange, addr_str): + """DELETE /bookmarks/{address} - Delete all bookmarks at address.""" + if not self.program: + return self._no_program() + + try: + addr = self.program.getAddressFactory().getAddress(addr_str) + bm = self.program.getBookmarkManager() + removed = [] + + def do_delete(): + for bookmark in list(bm.getBookmarks(addr)): + removed.append(bookmark.getTypeString()) + bookmark.remove() if hasattr(bookmark, 'remove') else bm.removeBookmark(bookmark) + + with_transaction(self.program, "Delete bookmarks", do_delete) + return {"success": True, "result": { + "address": addr_str, + "removedTypes": removed, + "count": len(removed), + "message": "Bookmarks deleted successfully", + }} + except Exception as e: + return {"success": False, "error": {"code": "BOOKMARK_ERROR", "message": str(e)}} + + # ================================================================== + # Enum Handlers + # ================================================================== + + def handle_enums(self, exchange): + """GET /datatypes/enums - List enum data types.""" + if not self.program: + return self._no_program() + params = parse_query_params(exchange) + limit = parse_int(params.get("limit"), 100) + offset = parse_int(params.get("offset"), 0) + grep_pattern = compile_grep(params) + + from ghidra.program.model.data import Enum as GhidraEnum + + dtm = self.program.getDataTypeManager() + enums = [] + count = 0 + skipped = 0 + + for dt in dtm.getAllDataTypes(): + if count >= limit: + break + if not isinstance(dt, GhidraEnum): + continue + if skipped < offset: + skipped += 1 + continue + + # Get enum members + members = [] + for name in dt.getNames(): + members.append({"name": name, "value": dt.getValue(name)}) + + item = { + "name": dt.getName(), + "category": str(dt.getCategoryPath()), + "size": dt.getLength(), + "members": members, + } + if not grep_matches_item(item, grep_pattern): + continue + enums.append(item) + count += 1 + + return {"success": True, "result": enums, "offset": offset, "limit": limit} + + def handle_enum_create(self, exchange): + """POST /datatypes/enums - Create a new enum.""" + if not self.program: + return self._no_program() + body = parse_json_body(exchange) + name = body.get("name", "") + size = int(body.get("size", 4)) + + if not name: + return {"success": False, "error": {"code": "MISSING_PARAMETER", "message": "'name' is required"}} + + try: + from ghidra.program.model.data import EnumDataType, CategoryPath + + dtm = self.program.getDataTypeManager() + new_enum = EnumDataType(name, size) + + def do_create(): + dtm.addDataType(new_enum, None) + + with_transaction(self.program, "Create enum", do_create) + return {"success": True, "result": { + "name": name, + "size": size, + "message": "Enum created successfully", + }} + except Exception as e: + return {"success": False, "error": {"code": "ENUM_ERROR", "message": str(e)}} + + # ================================================================== + # Typedef Handlers + # ================================================================== + + def handle_typedefs(self, exchange): + """GET /datatypes/typedefs - List typedef data types.""" + if not self.program: + return self._no_program() + params = parse_query_params(exchange) + limit = parse_int(params.get("limit"), 100) + offset = parse_int(params.get("offset"), 0) + grep_pattern = compile_grep(params) + + from ghidra.program.model.data import TypeDef + + dtm = self.program.getDataTypeManager() + typedefs = [] + count = 0 + skipped = 0 + + for dt in dtm.getAllDataTypes(): + if count >= limit: + break + if not isinstance(dt, TypeDef): + continue + if skipped < offset: + skipped += 1 + continue + item = { + "name": dt.getName(), + "category": str(dt.getCategoryPath()), + "baseType": dt.getBaseDataType().getName() if dt.getBaseDataType() else None, + "size": dt.getLength(), + } + if not grep_matches_item(item, grep_pattern): + continue + typedefs.append(item) + count += 1 + + return {"success": True, "result": typedefs, "offset": offset, "limit": limit} + + def handle_typedef_create(self, exchange): + """POST /datatypes/typedefs - Create a new typedef.""" + if not self.program: + return self._no_program() + body = parse_json_body(exchange) + name = body.get("name", "") + base_type_name = body.get("base_type", "") + + if not name or not base_type_name: + return {"success": False, "error": {"code": "MISSING_PARAMETER", "message": "'name' and 'base_type' are required"}} + + try: + from ghidra.program.model.data import TypedefDataType + + dtm = self.program.getDataTypeManager() + + # Use the shared resolver which handles builtins + path lookups + base_dt = resolve_data_type(dtm, base_type_name) + if not base_dt: + return {"success": False, "error": {"code": "NOT_FOUND", "message": "Base type not found: %s" % base_type_name}} + + new_typedef = TypedefDataType(name, base_dt) + + def do_create(): + dtm.addDataType(new_typedef, None) + + with_transaction(self.program, "Create typedef", do_create) + return {"success": True, "result": { + "name": name, + "baseType": base_type_name, + "message": "Typedef created successfully", + }} + except Exception as e: + return {"success": False, "error": {"code": "TYPEDEF_ERROR", "message": str(e)}} + # ================================================================== # Legacy Compatibility # ================================================================== diff --git a/src/ghydramcp/config.py b/src/ghydramcp/config.py index 50f3115..4228a13 100644 --- a/src/ghydramcp/config.py +++ b/src/ghydramcp/config.py @@ -102,6 +102,9 @@ class GhydraConfig: "variables": 1000, "namespaces": 500, "classes": 500, + "bookmarks": 1000, + "enums": 500, + "typedefs": 500, }) def __post_init__(self): diff --git a/src/ghydramcp/mixins/__init__.py b/src/ghydramcp/mixins/__init__.py index 4a2530c..a839bcd 100644 --- a/src/ghydramcp/mixins/__init__.py +++ b/src/ghydramcp/mixins/__init__.py @@ -6,8 +6,10 @@ Uses FastMCP's contrib.mcp_mixin pattern for clean modular organization. from .analysis import AnalysisMixin from .base import GhydraMixinBase +from .bookmarks import BookmarksMixin from .cursors import CursorsMixin from .data import DataMixin +from .datatypes import DataTypesMixin from .docker import DockerMixin from .functions import FunctionsMixin from .instances import InstancesMixin @@ -34,4 +36,6 @@ __all__ = [ "SegmentsMixin", "VariablesMixin", "NamespacesMixin", + "BookmarksMixin", + "DataTypesMixin", ] diff --git a/src/ghydramcp/mixins/bookmarks.py b/src/ghydramcp/mixins/bookmarks.py new file mode 100644 index 0000000..aece74b --- /dev/null +++ b/src/ghydramcp/mixins/bookmarks.py @@ -0,0 +1,171 @@ +"""Bookmarks mixin for GhydraMCP. + +Provides tools for managing Ghidra bookmarks (annotations at addresses). +""" + +from typing import Any, Dict, List, Optional + +from fastmcp import Context +from fastmcp.contrib.mcp_mixin import mcp_tool + +from ..config import get_config +from .base import GhydraMixinBase + + +class BookmarksMixin(GhydraMixinBase): + """Mixin for bookmark operations. + + Provides tools for: + - Listing bookmarks with type/category filtering + - Creating bookmarks at addresses + - Deleting bookmarks + """ + + @mcp_tool() + def bookmarks_list( + self, + type: Optional[str] = None, + category: Optional[str] = None, + port: Optional[int] = None, + page_size: int = 50, + grep: Optional[str] = None, + grep_ignorecase: bool = True, + return_all: bool = False, + fields: Optional[List[str]] = None, + ctx: Optional[Context] = None, + ) -> Dict[str, Any]: + """List bookmarks with optional type/category filtering. + + Args: + type: Filter by bookmark type (e.g. "Note", "Warning", "Error", "Info") + category: Filter by bookmark category + port: Ghidra instance port (optional) + page_size: Bookmarks per page (default: 50, max: 500) + grep: Regex pattern to filter bookmark comments + grep_ignorecase: Case-insensitive grep (default: True) + return_all: Return all bookmarks without pagination + fields: Field names to keep (e.g. ['address', 'type', 'comment']). Reduces response size. + ctx: FastMCP context (auto-injected) + + Returns: + Paginated list of bookmarks + """ + try: + port = self.get_instance_port(port) + except ValueError as e: + return {"success": False, "error": {"code": "NO_INSTANCE", "message": str(e)}} + + config = get_config() + cap = config.resource_caps.get("bookmarks", 1000) + params: Dict[str, Any] = {"limit": cap} + if type: + params["type"] = type + if category: + params["category"] = category + + response = self.safe_get(port, "bookmarks", params) + simplified = self.simplify_response(response) + + if not simplified.get("success", True): + return simplified + + bookmarks = simplified.get("result", []) + if not isinstance(bookmarks, list): + bookmarks = [] + + query_params = { + "tool": "bookmarks_list", + "port": port, + "type": type, + "category": category, + "grep": grep, + } + session_id = self._get_session_id(ctx) + + return self.filtered_paginate( + data=bookmarks, + query_params=query_params, + tool_name="bookmarks_list", + session_id=session_id, + page_size=min(page_size, config.max_page_size), + grep=grep, + grep_ignorecase=grep_ignorecase, + return_all=return_all, + fields=fields, + ) + + @mcp_tool() + def bookmarks_create( + self, + address: str, + type: str = "Note", + category: str = "", + comment: str = "", + port: Optional[int] = None, + ) -> Dict[str, Any]: + """Create a bookmark at the specified address. + + Args: + address: Memory address in hex format + type: Bookmark type (default: "Note"). Common types: Note, Warning, Error, Info + category: Bookmark category (optional grouping string) + comment: Bookmark comment text + port: Ghidra instance port (optional) + + Returns: + Created bookmark information + """ + if not address: + return { + "success": False, + "error": { + "code": "MISSING_PARAMETER", + "message": "address parameter is required", + }, + } + + try: + port = self.get_instance_port(port) + except ValueError as e: + return {"success": False, "error": {"code": "NO_INSTANCE", "message": str(e)}} + + payload = { + "address": address, + "type": type, + "category": category, + "comment": comment, + } + response = self.safe_post(port, "bookmarks", payload) + return self.simplify_response(response) + + @mcp_tool() + def bookmarks_delete( + self, + address: str, + port: Optional[int] = None, + ) -> Dict[str, Any]: + """Delete all bookmarks at the specified address. + + Args: + address: Memory address in hex format + port: Ghidra instance port (optional) + + Returns: + Operation result + """ + if not address: + return { + "success": False, + "error": { + "code": "MISSING_PARAMETER", + "message": "address parameter is required", + }, + } + + try: + port = self.get_instance_port(port) + except ValueError as e: + return {"success": False, "error": {"code": "NO_INSTANCE", "message": str(e)}} + + response = self.safe_delete(port, f"bookmarks/{address}") + return self.simplify_response(response) diff --git a/src/ghydramcp/mixins/datatypes.py b/src/ghydramcp/mixins/datatypes.py new file mode 100644 index 0000000..79c61d8 --- /dev/null +++ b/src/ghydramcp/mixins/datatypes.py @@ -0,0 +1,217 @@ +"""Data types mixin for GhydraMCP. + +Provides tools for managing enum and typedef data types. +""" + +from typing import Any, Dict, List, Optional + +from fastmcp import Context +from fastmcp.contrib.mcp_mixin import mcp_tool + +from ..config import get_config +from .base import GhydraMixinBase + + +class DataTypesMixin(GhydraMixinBase): + """Mixin for enum and typedef data type operations. + + Provides tools for: + - Listing and creating enum data types + - Listing and creating typedef data types + """ + + # --- Enums --- + + @mcp_tool() + def enums_list( + self, + port: Optional[int] = None, + page_size: int = 50, + grep: Optional[str] = None, + grep_ignorecase: bool = True, + return_all: bool = False, + fields: Optional[List[str]] = None, + ctx: Optional[Context] = None, + ) -> Dict[str, Any]: + """List enum data types with their members. + + Args: + port: Ghidra instance port (optional) + page_size: Enums per page (default: 50, max: 500) + grep: Regex pattern to filter enum names + grep_ignorecase: Case-insensitive grep (default: True) + return_all: Return all enums without pagination + fields: Field names to keep (e.g. ['name', 'size', 'members']). Reduces response size. + ctx: FastMCP context (auto-injected) + + Returns: + Paginated list of enum data types + """ + try: + port = self.get_instance_port(port) + except ValueError as e: + return {"success": False, "error": {"code": "NO_INSTANCE", "message": str(e)}} + + config = get_config() + cap = config.resource_caps.get("enums", 500) + response = self.safe_get(port, "datatypes/enums", {"limit": cap}) + simplified = self.simplify_response(response) + + if not simplified.get("success", True): + return simplified + + enums = simplified.get("result", []) + if not isinstance(enums, list): + enums = [] + + query_params = {"tool": "enums_list", "port": port, "grep": grep} + session_id = self._get_session_id(ctx) + + return self.filtered_paginate( + data=enums, + query_params=query_params, + tool_name="enums_list", + session_id=session_id, + page_size=min(page_size, config.max_page_size), + grep=grep, + grep_ignorecase=grep_ignorecase, + return_all=return_all, + fields=fields, + ) + + @mcp_tool() + def enums_create( + self, + name: str, + size: int = 4, + port: Optional[int] = None, + ) -> Dict[str, Any]: + """Create a new enum data type. + + Args: + name: Name for the new enum + size: Size in bytes (default: 4) + port: Ghidra instance port (optional) + + Returns: + Created enum information + """ + if not name: + return { + "success": False, + "error": { + "code": "MISSING_PARAMETER", + "message": "name parameter is required", + }, + } + + try: + port = self.get_instance_port(port) + except ValueError as e: + return {"success": False, "error": {"code": "NO_INSTANCE", "message": str(e)}} + + payload = {"name": name, "size": size} + response = self.safe_post(port, "datatypes/enums", payload) + return self.simplify_response(response) + + # --- Typedefs --- + + @mcp_tool() + def typedefs_list( + self, + port: Optional[int] = None, + page_size: int = 50, + grep: Optional[str] = None, + grep_ignorecase: bool = True, + return_all: bool = False, + fields: Optional[List[str]] = None, + ctx: Optional[Context] = None, + ) -> Dict[str, Any]: + """List typedef data types. + + Args: + port: Ghidra instance port (optional) + page_size: Typedefs per page (default: 50, max: 500) + grep: Regex pattern to filter typedef names + grep_ignorecase: Case-insensitive grep (default: True) + return_all: Return all typedefs without pagination + fields: Field names to keep (e.g. ['name', 'base_type']). Reduces response size. + ctx: FastMCP context (auto-injected) + + Returns: + Paginated list of typedef data types + """ + try: + port = self.get_instance_port(port) + except ValueError as e: + return {"success": False, "error": {"code": "NO_INSTANCE", "message": str(e)}} + + config = get_config() + cap = config.resource_caps.get("typedefs", 500) + response = self.safe_get(port, "datatypes/typedefs", {"limit": cap}) + simplified = self.simplify_response(response) + + if not simplified.get("success", True): + return simplified + + typedefs = simplified.get("result", []) + if not isinstance(typedefs, list): + typedefs = [] + + query_params = {"tool": "typedefs_list", "port": port, "grep": grep} + session_id = self._get_session_id(ctx) + + return self.filtered_paginate( + data=typedefs, + query_params=query_params, + tool_name="typedefs_list", + session_id=session_id, + page_size=min(page_size, config.max_page_size), + grep=grep, + grep_ignorecase=grep_ignorecase, + return_all=return_all, + fields=fields, + ) + + @mcp_tool() + def typedefs_create( + self, + name: str, + base_type: str, + port: Optional[int] = None, + ) -> Dict[str, Any]: + """Create a new typedef data type. + + Args: + name: Name for the new typedef + base_type: Name of the base data type (e.g. "int", "uint32_t", "char*") + port: Ghidra instance port (optional) + + Returns: + Created typedef information + """ + if not name: + return { + "success": False, + "error": { + "code": "MISSING_PARAMETER", + "message": "name parameter is required", + }, + } + if not base_type: + return { + "success": False, + "error": { + "code": "MISSING_PARAMETER", + "message": "base_type parameter is required", + }, + } + + try: + port = self.get_instance_port(port) + except ValueError as e: + return {"success": False, "error": {"code": "NO_INSTANCE", "message": str(e)}} + + payload = {"name": name, "base_type": base_type} + response = self.safe_post(port, "datatypes/typedefs", payload) + return self.simplify_response(response) diff --git a/src/ghydramcp/mixins/docker.py b/src/ghydramcp/mixins/docker.py index 243e165..7626e42 100644 --- a/src/ghydramcp/mixins/docker.py +++ b/src/ghydramcp/mixins/docker.py @@ -925,10 +925,20 @@ class DockerMixin(MCPMixin): Returns: Instance connection info with session ID and port details """ + import os + + requested_name = os.path.basename(binary_path) + + def _is_same_binary(health_program: str) -> bool: + """Check if a running instance has the same binary loaded.""" + if not health_program: + return False + return os.path.basename(health_program) == requested_name + # If port is specified, check that specific port if port is not None: health = await self.docker_health(port=port, ctx=ctx) - if health.get("healthy"): + if health.get("healthy") and _is_same_binary(health.get("program", "")): return { "source": "existing", "session_id": self.session_id, @@ -938,10 +948,10 @@ class DockerMixin(MCPMixin): "message": "Using existing Ghidra instance", } else: - # Check all pooled ports for an existing instance + # Check all pooled ports for an instance with the SAME binary for check_port in range(PORT_POOL_START, PORT_POOL_END + 1): health = await self.docker_health(port=check_port, timeout=1.0, ctx=ctx) - if health.get("healthy"): + if health.get("healthy") and _is_same_binary(health.get("program", "")): return { "source": "existing", "session_id": self.session_id, diff --git a/src/ghydramcp/mixins/symbols.py b/src/ghydramcp/mixins/symbols.py index 568c293..4f1ee0a 100644 --- a/src/ghydramcp/mixins/symbols.py +++ b/src/ghydramcp/mixins/symbols.py @@ -192,6 +192,124 @@ class SymbolsMixin(GhydraMixinBase): fields=fields, ) + @mcp_tool() + def symbols_create( + self, + name: str, + address: str, + port: Optional[int] = None, + ) -> Dict[str, Any]: + """Create a new label/symbol at the specified address. + + Args: + name: Name for the new symbol + address: Memory address in hex format + port: Ghidra instance port (optional) + + Returns: + Created symbol information + """ + if not name: + return { + "success": False, + "error": { + "code": "MISSING_PARAMETER", + "message": "name parameter is required", + }, + } + if not address: + return { + "success": False, + "error": { + "code": "MISSING_PARAMETER", + "message": "address parameter is required", + }, + } + + try: + port = self.get_instance_port(port) + except ValueError as e: + return {"success": False, "error": {"code": "NO_INSTANCE", "message": str(e)}} + + payload = {"name": name, "address": address} + response = self.safe_post(port, "symbols", payload) + return self.simplify_response(response) + + @mcp_tool() + def symbols_rename( + self, + address: str, + new_name: str, + port: Optional[int] = None, + ) -> Dict[str, Any]: + """Rename the primary symbol at the specified address. + + Args: + address: Memory address of the symbol in hex format + new_name: New name for the symbol + port: Ghidra instance port (optional) + + Returns: + Operation result with old and new names + """ + if not address: + return { + "success": False, + "error": { + "code": "MISSING_PARAMETER", + "message": "address parameter is required", + }, + } + if not new_name: + return { + "success": False, + "error": { + "code": "MISSING_PARAMETER", + "message": "new_name parameter is required", + }, + } + + try: + port = self.get_instance_port(port) + except ValueError as e: + return {"success": False, "error": {"code": "NO_INSTANCE", "message": str(e)}} + + payload = {"name": new_name} + response = self.safe_patch(port, f"symbols/{address}", payload) + return self.simplify_response(response) + + @mcp_tool() + def symbols_delete( + self, + address: str, + port: Optional[int] = None, + ) -> Dict[str, Any]: + """Delete the primary symbol at the specified address. + + Args: + address: Memory address of the symbol in hex format + port: Ghidra instance port (optional) + + Returns: + Operation result + """ + if not address: + return { + "success": False, + "error": { + "code": "MISSING_PARAMETER", + "message": "address parameter is required", + }, + } + + try: + port = self.get_instance_port(port) + except ValueError as e: + return {"success": False, "error": {"code": "NO_INSTANCE", "message": str(e)}} + + response = self.safe_delete(port, f"symbols/{address}") + return self.simplify_response(response) + # Resources @mcp_resource(uri="ghidra://instance/{port}/symbols") diff --git a/src/ghydramcp/mixins/variables.py b/src/ghydramcp/mixins/variables.py index 36bc7e0..a8c7707 100644 --- a/src/ghydramcp/mixins/variables.py +++ b/src/ghydramcp/mixins/variables.py @@ -160,6 +160,67 @@ class VariablesMixin(GhydraMixinBase): fields=fields, ) + @mcp_tool() + def variables_rename( + self, + function_address: str, + variable_name: str, + new_name: str, + new_type: Optional[str] = None, + port: Optional[int] = None, + ) -> Dict[str, Any]: + """Rename a variable (and optionally retype) within a function. + + Args: + function_address: Function address in hex format + variable_name: Current name of the variable + new_name: New name for the variable + new_type: New data type (optional, e.g. "int", "char*") + port: Ghidra instance port (optional) + + Returns: + Operation result + """ + if not function_address: + return { + "success": False, + "error": { + "code": "MISSING_PARAMETER", + "message": "function_address parameter is required", + }, + } + if not variable_name: + return { + "success": False, + "error": { + "code": "MISSING_PARAMETER", + "message": "variable_name parameter is required", + }, + } + if not new_name: + return { + "success": False, + "error": { + "code": "MISSING_PARAMETER", + "message": "new_name parameter is required", + }, + } + + try: + port = self.get_instance_port(port) + except ValueError as e: + return {"success": False, "error": {"code": "NO_INSTANCE", "message": str(e)}} + + from urllib.parse import quote + + payload: dict = {"name": new_name} + if new_type: + payload["data_type"] = new_type + + endpoint = f"functions/{function_address}/variables/{quote(variable_name)}" + response = self.safe_patch(port, endpoint, payload) + return self.simplify_response(response) + # Resources @mcp_resource(uri="ghidra://instance/{port}/variables") diff --git a/src/ghydramcp/server.py b/src/ghydramcp/server.py index acc3787..767572d 100644 --- a/src/ghydramcp/server.py +++ b/src/ghydramcp/server.py @@ -16,8 +16,10 @@ from fastmcp import FastMCP from .config import GhydraConfig, get_config, set_config from .mixins import ( AnalysisMixin, + BookmarksMixin, CursorsMixin, DataMixin, + DataTypesMixin, DockerMixin, FunctionsMixin, InstancesMixin, @@ -64,6 +66,8 @@ def create_server( segments_mixin = SegmentsMixin() variables_mixin = VariablesMixin() namespaces_mixin = NamespacesMixin() + bookmarks_mixin = BookmarksMixin() + datatypes_mixin = DataTypesMixin() # Register all mixins with the server # Each mixin registers its tools, resources, and prompts @@ -80,6 +84,8 @@ def create_server( segments_mixin.register_all(mcp) variables_mixin.register_all(mcp) namespaces_mixin.register_all(mcp) + bookmarks_mixin.register_all(mcp) + datatypes_mixin.register_all(mcp) # Optional feedback collection cfg = get_config()