feat: Complete Sprint 3+4 — CRUD operations, bookmarks, enums, typedefs

Sprint 3 (Symbol & Variable CRUD):
- Add symbols_create, symbols_rename, symbols_delete MCP tools
- Add variables_rename MCP tool with optional type change
- Implement corresponding Jython HTTP handlers in headless server

Sprint 4 (Bookmarks & Data Types):
- Add BookmarksMixin (bookmarks_list, bookmarks_create, bookmarks_delete)
- Add DataTypesMixin (enums_list, enums_create, typedefs_list, typedefs_create)
- Register both mixins in server.py, add resource caps in config.py

Fixes:
- Use resolve_data_type() for typedef creation and variable retyping
  (was missing builtin types like int, char, void)
- Fix docker_auto_start reusing containers with wrong binary loaded
  (now compares requested binary name against running instance)

Headless server (GhydraMCPServer.py): +14 routes, 58 total
MCP tools: 75 registered
Tested: 24/24 endpoint tests passing
This commit is contained in:
Ryan Malloy 2026-01-31 15:16:39 -07:00
parent 0d25a0dc24
commit c93abaf86c
9 changed files with 1194 additions and 7 deletions

View File

@ -305,6 +305,7 @@ ROUTES = [
("GET", r"^/strings$", "handle_strings"), ("GET", r"^/strings$", "handle_strings"),
# Memory # Memory
("GET", r"^/memory/([^/]+)/comments/([^/]+)$", "handle_get_comment"),
("POST", r"^/memory/([^/]+)/comments/([^/]+)$", "handle_set_comment"), ("POST", r"^/memory/([^/]+)/comments/([^/]+)$", "handle_set_comment"),
("GET", r"^/memory/blocks$", "handle_memory_blocks"), ("GET", r"^/memory/blocks$", "handle_memory_blocks"),
("GET", r"^/memory$", "handle_memory_read"), ("GET", r"^/memory$", "handle_memory_read"),
@ -316,8 +317,26 @@ ROUTES = [
# Symbols # Symbols
("GET", r"^/symbols/imports$", "handle_imports"), ("GET", r"^/symbols/imports$", "handle_imports"),
("GET", r"^/symbols/exports$", "handle_exports"), ("GET", r"^/symbols/exports$", "handle_exports"),
("POST", r"^/symbols$", "handle_symbol_create"),
("PATCH", r"^/symbols/([^/]+)$", "handle_symbol_rename"),
("DELETE", r"^/symbols/([^/]+)$", "handle_symbol_delete"),
("GET", r"^/symbols$", "handle_symbols"), ("GET", r"^/symbols$", "handle_symbols"),
# Variables
("PATCH", r"^/functions/([^/]+)/variables/([^/]+)$", "handle_variable_rename"),
("GET", r"^/variables$", "handle_variables"),
# Bookmarks
("POST", r"^/bookmarks$", "handle_bookmark_create"),
("DELETE", r"^/bookmarks/([^/]+)$", "handle_bookmark_delete"),
("GET", r"^/bookmarks$", "handle_bookmarks"),
# Data types
("POST", r"^/datatypes/enums$", "handle_enum_create"),
("GET", r"^/datatypes/enums$", "handle_enums"),
("POST", r"^/datatypes/typedefs$", "handle_typedef_create"),
("GET", r"^/datatypes/typedefs$", "handle_typedefs"),
# Cross-references # Cross-references
("GET", r"^/xrefs$", "handle_xrefs"), ("GET", r"^/xrefs$", "handle_xrefs"),
@ -734,26 +753,62 @@ class GhydraMCPHandler(HttpHandler):
limit = parse_int(params.get("limit"), 100) limit = parse_int(params.get("limit"), 100)
offset = parse_int(params.get("offset"), 0) offset = parse_int(params.get("offset"), 0)
name_filter = params.get("name") name_filter = params.get("name")
name_contains = params.get("name_contains")
name_regex = params.get("name_matches_regex")
addr_filter = params.get("addr")
grep_pattern = compile_grep(params) grep_pattern = compile_grep(params)
functions = [] # Compile name regex if provided
name_regex_pat = None
if name_regex:
try:
name_regex_pat = re.compile(name_regex, re.IGNORECASE)
except:
return {"success": False, "error": {"code": "INVALID_REGEX", "message": "Invalid regex: %s" % name_regex}}
fm = self.program.getFunctionManager() fm = self.program.getFunctionManager()
total = fm.getFunctionCount() total = fm.getFunctionCount()
# Single function lookup by address
if addr_filter:
func = self._find_function_at(addr_filter)
if not func:
return {"success": True, "result": [], "size": 0, "offset": 0, "limit": limit}
addr = str(func.getEntryPoint())
return {"success": True, "result": [{
"name": func.getName(),
"address": addr,
"signature": str(func.getSignature()),
"parameterCount": func.getParameterCount(),
"isThunk": func.isThunk(),
"_links": {
"self": make_link("/functions/%s" % addr),
"decompile": make_link("/functions/%s/decompile" % addr),
"disassembly": make_link("/functions/%s/disassembly" % addr),
},
}], "size": 1, "offset": 0, "limit": limit}
functions = []
count = 0 count = 0
skipped = 0 skipped = 0
for func in fm.getFunctions(True): for func in fm.getFunctions(True):
if count >= limit: if count >= limit:
break break
# Apply name filter func_name = func.getName()
if name_filter and name_filter.lower() not in func.getName().lower(): # Apply name filters
if name_filter and name_filter.lower() not in func_name.lower():
continue
if name_contains and name_contains.lower() not in func_name.lower():
continue
if name_regex_pat and not name_regex_pat.search(func_name):
continue continue
if skipped < offset: if skipped < offset:
skipped += 1 skipped += 1
continue continue
addr = str(func.getEntryPoint()) addr = str(func.getEntryPoint())
item = { item = {
"name": func.getName(), "name": func_name,
"address": addr, "address": addr,
"signature": str(func.getSignature()), "signature": str(func.getSignature()),
"parameterCount": func.getParameterCount(), "parameterCount": func.getParameterCount(),
@ -1358,6 +1413,36 @@ class GhydraMCPHandler(HttpHandler):
"""List memory blocks (alias for /segments).""" """List memory blocks (alias for /segments)."""
return self.handle_segments(exchange) return self.handle_segments(exchange)
def handle_get_comment(self, exchange, addr_str, comment_type):
"""Get a comment at a specific address."""
if not self.program:
return self._no_program()
ct = COMMENT_TYPE_MAP.get(comment_type.lower())
if ct is None:
return {"success": False, "error": {
"code": "INVALID_COMMENT_TYPE",
"message": "Invalid comment type: %s. Use: pre, post, eol, plate, repeatable" % comment_type
}}
try:
addr = self.program.getAddressFactory().getAddress(addr_str)
listing = self.program.getListing()
cu = listing.getCodeUnitAt(addr)
if not cu:
cu = listing.getCodeUnitContaining(addr)
if not cu:
return {"success": True, "result": {"address": addr_str, "commentType": comment_type, "comment": None}}
comment = cu.getComment(ct)
return {"success": True, "result": {
"address": addr_str,
"commentType": comment_type,
"comment": comment,
}}
except Exception as e:
return {"success": False, "error": {"code": "COMMENT_ERROR", "message": str(e)}}
def handle_set_comment(self, exchange, addr_str, comment_type): def handle_set_comment(self, exchange, addr_str, comment_type):
"""Set a comment at a specific address.""" """Set a comment at a specific address."""
if not self.program: if not self.program:
@ -2112,6 +2197,518 @@ class GhydraMCPHandler(HttpHandler):
except Exception as e: except Exception as e:
return {"success": False, "error": {"code": "DATAFLOW_ERROR", "message": str(e)}} return {"success": False, "error": {"code": "DATAFLOW_ERROR", "message": str(e)}}
# ==================================================================
# Symbol CRUD Handlers
# ==================================================================
def handle_symbol_create(self, exchange):
"""POST /symbols - Create a new label/symbol."""
if not self.program:
return self._no_program()
body = parse_json_body(exchange)
name = body.get("name", "")
address = body.get("address", "")
if not name or not address:
return {"success": False, "error": {"code": "MISSING_PARAMETER", "message": "Both 'name' and 'address' are required"}}
try:
addr = self.program.getAddressFactory().getAddress(address)
st = self.program.getSymbolTable()
def do_create():
st.createLabel(addr, name, SourceType.USER_DEFINED)
with_transaction(self.program, "Create symbol", do_create)
return {"success": True, "result": {
"name": name,
"address": address,
"message": "Symbol created successfully",
}}
except Exception as e:
return {"success": False, "error": {"code": "SYMBOL_ERROR", "message": str(e)}}
def handle_symbol_rename(self, exchange, addr_str):
"""PATCH /symbols/{address} - Rename primary symbol at address."""
if not self.program:
return self._no_program()
body = parse_json_body(exchange)
new_name = body.get("name", "")
if not new_name:
return {"success": False, "error": {"code": "MISSING_PARAMETER", "message": "'name' parameter is required"}}
try:
addr = self.program.getAddressFactory().getAddress(addr_str)
st = self.program.getSymbolTable()
symbol = st.getPrimarySymbol(addr)
if not symbol:
return {"success": False, "error": {"code": "NOT_FOUND", "message": "No symbol at address: %s" % addr_str}}
old_name = symbol.getName()
def do_rename():
symbol.setName(new_name, SourceType.USER_DEFINED)
with_transaction(self.program, "Rename symbol", do_rename)
return {"success": True, "result": {
"address": addr_str,
"oldName": old_name,
"newName": new_name,
"message": "Symbol renamed successfully",
}}
except Exception as e:
return {"success": False, "error": {"code": "SYMBOL_ERROR", "message": str(e)}}
def handle_symbol_delete(self, exchange, addr_str):
"""DELETE /symbols/{address} - Delete primary symbol at address."""
if not self.program:
return self._no_program()
try:
addr = self.program.getAddressFactory().getAddress(addr_str)
st = self.program.getSymbolTable()
symbol = st.getPrimarySymbol(addr)
if not symbol:
return {"success": False, "error": {"code": "NOT_FOUND", "message": "No symbol at address: %s" % addr_str}}
name = symbol.getName()
def do_delete():
symbol.delete()
with_transaction(self.program, "Delete symbol", do_delete)
return {"success": True, "result": {
"address": addr_str,
"name": name,
"message": "Symbol deleted successfully",
}}
except Exception as e:
return {"success": False, "error": {"code": "SYMBOL_ERROR", "message": str(e)}}
# ==================================================================
# Variable Rename Handler
# ==================================================================
def handle_variable_rename(self, exchange, addr_str, var_name):
"""PATCH /functions/{address}/variables/{name} - Rename/retype a variable."""
if not self.program:
return self._no_program()
body = parse_json_body(exchange)
new_name = body.get("name", "")
new_type = body.get("data_type")
if not new_name:
return {"success": False, "error": {"code": "MISSING_PARAMETER", "message": "'name' parameter is required"}}
try:
# URL-decode the variable name
from java.net import URLDecoder
decoded_name = URLDecoder.decode(var_name, "UTF-8")
func = self._find_function_at(addr_str)
if not func:
return {"success": False, "error": {"code": "FUNCTION_NOT_FOUND", "message": "No function at address: %s" % addr_str}}
# Search parameters and local variables
target_var = None
for param in func.getParameters():
if param.getName() == decoded_name:
target_var = param
break
if not target_var:
for var in func.getAllVariables():
if var.getName() == decoded_name:
target_var = var
break
if not target_var:
return {"success": False, "error": {"code": "NOT_FOUND", "message": "Variable '%s' not found in function" % decoded_name}}
old_name = target_var.getName()
def do_rename():
target_var.setName(new_name, SourceType.USER_DEFINED)
if new_type:
dtm = self.program.getDataTypeManager()
dt = resolve_data_type(dtm, new_type)
if dt:
target_var.setDataType(dt, SourceType.USER_DEFINED)
with_transaction(self.program, "Rename variable", do_rename)
return {"success": True, "result": {
"function": addr_str,
"oldName": old_name,
"newName": new_name,
"message": "Variable renamed successfully",
}}
except Exception as e:
return {"success": False, "error": {"code": "VARIABLE_ERROR", "message": str(e)}}
# ==================================================================
# Variables Handler
# ==================================================================
def handle_variables(self, exchange):
"""GET /variables - List global and function variables."""
if not self.program:
return self._no_program()
params = parse_query_params(exchange)
limit = parse_int(params.get("limit"), 100)
offset = parse_int(params.get("offset"), 0)
global_only = params.get("global_only", "false").lower() == "true"
search = params.get("search", "")
grep_pattern = compile_grep(params)
variables = []
count = 0
skipped = 0
# Function variables (parameters + locals)
if not global_only:
fm = self.program.getFunctionManager()
for func in fm.getFunctions(True):
if count >= limit:
break
func_name = func.getName()
func_addr = str(func.getEntryPoint())
for param in func.getParameters():
if count >= limit:
break
var_name = param.getName()
if search and search.lower() not in var_name.lower():
continue
if skipped < offset:
skipped += 1
continue
item = {
"name": var_name,
"type": str(param.getDataType()),
"storage": str(param.getVariableStorage()),
"scope": "parameter",
"function": func_name,
"functionAddress": func_addr,
}
if not grep_matches_item(item, grep_pattern):
continue
variables.append(item)
count += 1
for var in func.getLocalVariables():
if count >= limit:
break
var_name = var.getName()
if search and search.lower() not in var_name.lower():
continue
if skipped < offset:
skipped += 1
continue
item = {
"name": var_name,
"type": str(var.getDataType()),
"storage": str(var.getVariableStorage()),
"scope": "local",
"function": func_name,
"functionAddress": func_addr,
}
if not grep_matches_item(item, grep_pattern):
continue
variables.append(item)
count += 1
# Global variables (defined data with symbol names)
listing = self.program.getListing()
st = self.program.getSymbolTable()
for data in listing.getDefinedData(True):
if count >= limit:
break
addr = data.getAddress()
symbol = st.getPrimarySymbol(addr)
if not symbol:
continue
sym_name = symbol.getName()
# Skip auto-generated names
if sym_name.startswith("DAT_") or sym_name.startswith("s_"):
continue
if search and search.lower() not in sym_name.lower():
continue
if skipped < offset:
skipped += 1
continue
item = {
"name": sym_name,
"address": str(addr),
"type": str(data.getDataType()),
"scope": "global",
"size": data.getLength(),
}
if not grep_matches_item(item, grep_pattern):
continue
variables.append(item)
count += 1
return {"success": True, "result": variables, "offset": offset, "limit": limit}
# ==================================================================
# Bookmarks Handlers
# ==================================================================
def handle_bookmarks(self, exchange):
"""GET /bookmarks - List bookmarks with optional filtering."""
if not self.program:
return self._no_program()
params = parse_query_params(exchange)
limit = parse_int(params.get("limit"), 100)
offset = parse_int(params.get("offset"), 0)
type_filter = params.get("type")
category_filter = params.get("category")
grep_pattern = compile_grep(params)
bm = self.program.getBookmarkManager()
bookmarks = []
count = 0
skipped = 0
# Get bookmark types to iterate
if type_filter:
bm_types = [type_filter]
else:
bm_types = [str(t) for t in bm.getBookmarkTypes()]
for btype in bm_types:
if count >= limit:
break
try:
it = bm.getBookmarksIterator(btype)
except:
continue
while it.hasNext() and count < limit:
bookmark = it.next()
if category_filter and bookmark.getCategory() != category_filter:
continue
if skipped < offset:
skipped += 1
continue
item = {
"address": str(bookmark.getAddress()),
"type": bookmark.getTypeString(),
"category": bookmark.getCategory(),
"comment": bookmark.getComment(),
}
if not grep_matches_item(item, grep_pattern):
continue
bookmarks.append(item)
count += 1
return {"success": True, "result": bookmarks, "offset": offset, "limit": limit}
def handle_bookmark_create(self, exchange):
"""POST /bookmarks - Create a bookmark."""
if not self.program:
return self._no_program()
body = parse_json_body(exchange)
address = body.get("address", "")
btype = body.get("type", "Note")
category = body.get("category", "")
comment = body.get("comment", "")
if not address:
return {"success": False, "error": {"code": "MISSING_PARAMETER", "message": "'address' is required"}}
try:
addr = self.program.getAddressFactory().getAddress(address)
bm = self.program.getBookmarkManager()
def do_create():
bm.setBookmark(addr, btype, category, comment)
with_transaction(self.program, "Create bookmark", do_create)
return {"success": True, "result": {
"address": address,
"type": btype,
"category": category,
"comment": comment,
"message": "Bookmark created successfully",
}}
except Exception as e:
return {"success": False, "error": {"code": "BOOKMARK_ERROR", "message": str(e)}}
def handle_bookmark_delete(self, exchange, addr_str):
"""DELETE /bookmarks/{address} - Delete all bookmarks at address."""
if not self.program:
return self._no_program()
try:
addr = self.program.getAddressFactory().getAddress(addr_str)
bm = self.program.getBookmarkManager()
removed = []
def do_delete():
for bookmark in list(bm.getBookmarks(addr)):
removed.append(bookmark.getTypeString())
bookmark.remove() if hasattr(bookmark, 'remove') else bm.removeBookmark(bookmark)
with_transaction(self.program, "Delete bookmarks", do_delete)
return {"success": True, "result": {
"address": addr_str,
"removedTypes": removed,
"count": len(removed),
"message": "Bookmarks deleted successfully",
}}
except Exception as e:
return {"success": False, "error": {"code": "BOOKMARK_ERROR", "message": str(e)}}
# ==================================================================
# Enum Handlers
# ==================================================================
def handle_enums(self, exchange):
"""GET /datatypes/enums - List enum data types."""
if not self.program:
return self._no_program()
params = parse_query_params(exchange)
limit = parse_int(params.get("limit"), 100)
offset = parse_int(params.get("offset"), 0)
grep_pattern = compile_grep(params)
from ghidra.program.model.data import Enum as GhidraEnum
dtm = self.program.getDataTypeManager()
enums = []
count = 0
skipped = 0
for dt in dtm.getAllDataTypes():
if count >= limit:
break
if not isinstance(dt, GhidraEnum):
continue
if skipped < offset:
skipped += 1
continue
# Get enum members
members = []
for name in dt.getNames():
members.append({"name": name, "value": dt.getValue(name)})
item = {
"name": dt.getName(),
"category": str(dt.getCategoryPath()),
"size": dt.getLength(),
"members": members,
}
if not grep_matches_item(item, grep_pattern):
continue
enums.append(item)
count += 1
return {"success": True, "result": enums, "offset": offset, "limit": limit}
def handle_enum_create(self, exchange):
"""POST /datatypes/enums - Create a new enum."""
if not self.program:
return self._no_program()
body = parse_json_body(exchange)
name = body.get("name", "")
size = int(body.get("size", 4))
if not name:
return {"success": False, "error": {"code": "MISSING_PARAMETER", "message": "'name' is required"}}
try:
from ghidra.program.model.data import EnumDataType, CategoryPath
dtm = self.program.getDataTypeManager()
new_enum = EnumDataType(name, size)
def do_create():
dtm.addDataType(new_enum, None)
with_transaction(self.program, "Create enum", do_create)
return {"success": True, "result": {
"name": name,
"size": size,
"message": "Enum created successfully",
}}
except Exception as e:
return {"success": False, "error": {"code": "ENUM_ERROR", "message": str(e)}}
# ==================================================================
# Typedef Handlers
# ==================================================================
def handle_typedefs(self, exchange):
"""GET /datatypes/typedefs - List typedef data types."""
if not self.program:
return self._no_program()
params = parse_query_params(exchange)
limit = parse_int(params.get("limit"), 100)
offset = parse_int(params.get("offset"), 0)
grep_pattern = compile_grep(params)
from ghidra.program.model.data import TypeDef
dtm = self.program.getDataTypeManager()
typedefs = []
count = 0
skipped = 0
for dt in dtm.getAllDataTypes():
if count >= limit:
break
if not isinstance(dt, TypeDef):
continue
if skipped < offset:
skipped += 1
continue
item = {
"name": dt.getName(),
"category": str(dt.getCategoryPath()),
"baseType": dt.getBaseDataType().getName() if dt.getBaseDataType() else None,
"size": dt.getLength(),
}
if not grep_matches_item(item, grep_pattern):
continue
typedefs.append(item)
count += 1
return {"success": True, "result": typedefs, "offset": offset, "limit": limit}
def handle_typedef_create(self, exchange):
"""POST /datatypes/typedefs - Create a new typedef."""
if not self.program:
return self._no_program()
body = parse_json_body(exchange)
name = body.get("name", "")
base_type_name = body.get("base_type", "")
if not name or not base_type_name:
return {"success": False, "error": {"code": "MISSING_PARAMETER", "message": "'name' and 'base_type' are required"}}
try:
from ghidra.program.model.data import TypedefDataType
dtm = self.program.getDataTypeManager()
# Use the shared resolver which handles builtins + path lookups
base_dt = resolve_data_type(dtm, base_type_name)
if not base_dt:
return {"success": False, "error": {"code": "NOT_FOUND", "message": "Base type not found: %s" % base_type_name}}
new_typedef = TypedefDataType(name, base_dt)
def do_create():
dtm.addDataType(new_typedef, None)
with_transaction(self.program, "Create typedef", do_create)
return {"success": True, "result": {
"name": name,
"baseType": base_type_name,
"message": "Typedef created successfully",
}}
except Exception as e:
return {"success": False, "error": {"code": "TYPEDEF_ERROR", "message": str(e)}}
# ================================================================== # ==================================================================
# Legacy Compatibility # Legacy Compatibility
# ================================================================== # ==================================================================

View File

@ -102,6 +102,9 @@ class GhydraConfig:
"variables": 1000, "variables": 1000,
"namespaces": 500, "namespaces": 500,
"classes": 500, "classes": 500,
"bookmarks": 1000,
"enums": 500,
"typedefs": 500,
}) })
def __post_init__(self): def __post_init__(self):

View File

@ -6,8 +6,10 @@ Uses FastMCP's contrib.mcp_mixin pattern for clean modular organization.
from .analysis import AnalysisMixin from .analysis import AnalysisMixin
from .base import GhydraMixinBase from .base import GhydraMixinBase
from .bookmarks import BookmarksMixin
from .cursors import CursorsMixin from .cursors import CursorsMixin
from .data import DataMixin from .data import DataMixin
from .datatypes import DataTypesMixin
from .docker import DockerMixin from .docker import DockerMixin
from .functions import FunctionsMixin from .functions import FunctionsMixin
from .instances import InstancesMixin from .instances import InstancesMixin
@ -34,4 +36,6 @@ __all__ = [
"SegmentsMixin", "SegmentsMixin",
"VariablesMixin", "VariablesMixin",
"NamespacesMixin", "NamespacesMixin",
"BookmarksMixin",
"DataTypesMixin",
] ]

View File

@ -0,0 +1,171 @@
"""Bookmarks mixin for GhydraMCP.
Provides tools for managing Ghidra bookmarks (annotations at addresses).
"""
from typing import Any, Dict, List, Optional
from fastmcp import Context
from fastmcp.contrib.mcp_mixin import mcp_tool
from ..config import get_config
from .base import GhydraMixinBase
class BookmarksMixin(GhydraMixinBase):
"""Mixin for bookmark operations.
Provides tools for:
- Listing bookmarks with type/category filtering
- Creating bookmarks at addresses
- Deleting bookmarks
"""
@mcp_tool()
def bookmarks_list(
self,
type: Optional[str] = None,
category: Optional[str] = None,
port: Optional[int] = None,
page_size: int = 50,
grep: Optional[str] = None,
grep_ignorecase: bool = True,
return_all: bool = False,
fields: Optional[List[str]] = None,
ctx: Optional[Context] = None,
) -> Dict[str, Any]:
"""List bookmarks with optional type/category filtering.
Args:
type: Filter by bookmark type (e.g. "Note", "Warning", "Error", "Info")
category: Filter by bookmark category
port: Ghidra instance port (optional)
page_size: Bookmarks per page (default: 50, max: 500)
grep: Regex pattern to filter bookmark comments
grep_ignorecase: Case-insensitive grep (default: True)
return_all: Return all bookmarks without pagination
fields: Field names to keep (e.g. ['address', 'type', 'comment']). Reduces response size.
ctx: FastMCP context (auto-injected)
Returns:
Paginated list of bookmarks
"""
try:
port = self.get_instance_port(port)
except ValueError as e:
return {"success": False, "error": {"code": "NO_INSTANCE", "message": str(e)}}
config = get_config()
cap = config.resource_caps.get("bookmarks", 1000)
params: Dict[str, Any] = {"limit": cap}
if type:
params["type"] = type
if category:
params["category"] = category
response = self.safe_get(port, "bookmarks", params)
simplified = self.simplify_response(response)
if not simplified.get("success", True):
return simplified
bookmarks = simplified.get("result", [])
if not isinstance(bookmarks, list):
bookmarks = []
query_params = {
"tool": "bookmarks_list",
"port": port,
"type": type,
"category": category,
"grep": grep,
}
session_id = self._get_session_id(ctx)
return self.filtered_paginate(
data=bookmarks,
query_params=query_params,
tool_name="bookmarks_list",
session_id=session_id,
page_size=min(page_size, config.max_page_size),
grep=grep,
grep_ignorecase=grep_ignorecase,
return_all=return_all,
fields=fields,
)
@mcp_tool()
def bookmarks_create(
self,
address: str,
type: str = "Note",
category: str = "",
comment: str = "",
port: Optional[int] = None,
) -> Dict[str, Any]:
"""Create a bookmark at the specified address.
Args:
address: Memory address in hex format
type: Bookmark type (default: "Note"). Common types: Note, Warning, Error, Info
category: Bookmark category (optional grouping string)
comment: Bookmark comment text
port: Ghidra instance port (optional)
Returns:
Created bookmark information
"""
if not address:
return {
"success": False,
"error": {
"code": "MISSING_PARAMETER",
"message": "address parameter is required",
},
}
try:
port = self.get_instance_port(port)
except ValueError as e:
return {"success": False, "error": {"code": "NO_INSTANCE", "message": str(e)}}
payload = {
"address": address,
"type": type,
"category": category,
"comment": comment,
}
response = self.safe_post(port, "bookmarks", payload)
return self.simplify_response(response)
@mcp_tool()
def bookmarks_delete(
self,
address: str,
port: Optional[int] = None,
) -> Dict[str, Any]:
"""Delete all bookmarks at the specified address.
Args:
address: Memory address in hex format
port: Ghidra instance port (optional)
Returns:
Operation result
"""
if not address:
return {
"success": False,
"error": {
"code": "MISSING_PARAMETER",
"message": "address parameter is required",
},
}
try:
port = self.get_instance_port(port)
except ValueError as e:
return {"success": False, "error": {"code": "NO_INSTANCE", "message": str(e)}}
response = self.safe_delete(port, f"bookmarks/{address}")
return self.simplify_response(response)

View File

@ -0,0 +1,217 @@
"""Data types mixin for GhydraMCP.
Provides tools for managing enum and typedef data types.
"""
from typing import Any, Dict, List, Optional
from fastmcp import Context
from fastmcp.contrib.mcp_mixin import mcp_tool
from ..config import get_config
from .base import GhydraMixinBase
class DataTypesMixin(GhydraMixinBase):
"""Mixin for enum and typedef data type operations.
Provides tools for:
- Listing and creating enum data types
- Listing and creating typedef data types
"""
# --- Enums ---
@mcp_tool()
def enums_list(
self,
port: Optional[int] = None,
page_size: int = 50,
grep: Optional[str] = None,
grep_ignorecase: bool = True,
return_all: bool = False,
fields: Optional[List[str]] = None,
ctx: Optional[Context] = None,
) -> Dict[str, Any]:
"""List enum data types with their members.
Args:
port: Ghidra instance port (optional)
page_size: Enums per page (default: 50, max: 500)
grep: Regex pattern to filter enum names
grep_ignorecase: Case-insensitive grep (default: True)
return_all: Return all enums without pagination
fields: Field names to keep (e.g. ['name', 'size', 'members']). Reduces response size.
ctx: FastMCP context (auto-injected)
Returns:
Paginated list of enum data types
"""
try:
port = self.get_instance_port(port)
except ValueError as e:
return {"success": False, "error": {"code": "NO_INSTANCE", "message": str(e)}}
config = get_config()
cap = config.resource_caps.get("enums", 500)
response = self.safe_get(port, "datatypes/enums", {"limit": cap})
simplified = self.simplify_response(response)
if not simplified.get("success", True):
return simplified
enums = simplified.get("result", [])
if not isinstance(enums, list):
enums = []
query_params = {"tool": "enums_list", "port": port, "grep": grep}
session_id = self._get_session_id(ctx)
return self.filtered_paginate(
data=enums,
query_params=query_params,
tool_name="enums_list",
session_id=session_id,
page_size=min(page_size, config.max_page_size),
grep=grep,
grep_ignorecase=grep_ignorecase,
return_all=return_all,
fields=fields,
)
@mcp_tool()
def enums_create(
self,
name: str,
size: int = 4,
port: Optional[int] = None,
) -> Dict[str, Any]:
"""Create a new enum data type.
Args:
name: Name for the new enum
size: Size in bytes (default: 4)
port: Ghidra instance port (optional)
Returns:
Created enum information
"""
if not name:
return {
"success": False,
"error": {
"code": "MISSING_PARAMETER",
"message": "name parameter is required",
},
}
try:
port = self.get_instance_port(port)
except ValueError as e:
return {"success": False, "error": {"code": "NO_INSTANCE", "message": str(e)}}
payload = {"name": name, "size": size}
response = self.safe_post(port, "datatypes/enums", payload)
return self.simplify_response(response)
# --- Typedefs ---
@mcp_tool()
def typedefs_list(
self,
port: Optional[int] = None,
page_size: int = 50,
grep: Optional[str] = None,
grep_ignorecase: bool = True,
return_all: bool = False,
fields: Optional[List[str]] = None,
ctx: Optional[Context] = None,
) -> Dict[str, Any]:
"""List typedef data types.
Args:
port: Ghidra instance port (optional)
page_size: Typedefs per page (default: 50, max: 500)
grep: Regex pattern to filter typedef names
grep_ignorecase: Case-insensitive grep (default: True)
return_all: Return all typedefs without pagination
fields: Field names to keep (e.g. ['name', 'base_type']). Reduces response size.
ctx: FastMCP context (auto-injected)
Returns:
Paginated list of typedef data types
"""
try:
port = self.get_instance_port(port)
except ValueError as e:
return {"success": False, "error": {"code": "NO_INSTANCE", "message": str(e)}}
config = get_config()
cap = config.resource_caps.get("typedefs", 500)
response = self.safe_get(port, "datatypes/typedefs", {"limit": cap})
simplified = self.simplify_response(response)
if not simplified.get("success", True):
return simplified
typedefs = simplified.get("result", [])
if not isinstance(typedefs, list):
typedefs = []
query_params = {"tool": "typedefs_list", "port": port, "grep": grep}
session_id = self._get_session_id(ctx)
return self.filtered_paginate(
data=typedefs,
query_params=query_params,
tool_name="typedefs_list",
session_id=session_id,
page_size=min(page_size, config.max_page_size),
grep=grep,
grep_ignorecase=grep_ignorecase,
return_all=return_all,
fields=fields,
)
@mcp_tool()
def typedefs_create(
self,
name: str,
base_type: str,
port: Optional[int] = None,
) -> Dict[str, Any]:
"""Create a new typedef data type.
Args:
name: Name for the new typedef
base_type: Name of the base data type (e.g. "int", "uint32_t", "char*")
port: Ghidra instance port (optional)
Returns:
Created typedef information
"""
if not name:
return {
"success": False,
"error": {
"code": "MISSING_PARAMETER",
"message": "name parameter is required",
},
}
if not base_type:
return {
"success": False,
"error": {
"code": "MISSING_PARAMETER",
"message": "base_type parameter is required",
},
}
try:
port = self.get_instance_port(port)
except ValueError as e:
return {"success": False, "error": {"code": "NO_INSTANCE", "message": str(e)}}
payload = {"name": name, "base_type": base_type}
response = self.safe_post(port, "datatypes/typedefs", payload)
return self.simplify_response(response)

View File

@ -925,10 +925,20 @@ class DockerMixin(MCPMixin):
Returns: Returns:
Instance connection info with session ID and port details Instance connection info with session ID and port details
""" """
import os
requested_name = os.path.basename(binary_path)
def _is_same_binary(health_program: str) -> bool:
"""Check if a running instance has the same binary loaded."""
if not health_program:
return False
return os.path.basename(health_program) == requested_name
# If port is specified, check that specific port # If port is specified, check that specific port
if port is not None: if port is not None:
health = await self.docker_health(port=port, ctx=ctx) health = await self.docker_health(port=port, ctx=ctx)
if health.get("healthy"): if health.get("healthy") and _is_same_binary(health.get("program", "")):
return { return {
"source": "existing", "source": "existing",
"session_id": self.session_id, "session_id": self.session_id,
@ -938,10 +948,10 @@ class DockerMixin(MCPMixin):
"message": "Using existing Ghidra instance", "message": "Using existing Ghidra instance",
} }
else: else:
# Check all pooled ports for an existing instance # Check all pooled ports for an instance with the SAME binary
for check_port in range(PORT_POOL_START, PORT_POOL_END + 1): for check_port in range(PORT_POOL_START, PORT_POOL_END + 1):
health = await self.docker_health(port=check_port, timeout=1.0, ctx=ctx) health = await self.docker_health(port=check_port, timeout=1.0, ctx=ctx)
if health.get("healthy"): if health.get("healthy") and _is_same_binary(health.get("program", "")):
return { return {
"source": "existing", "source": "existing",
"session_id": self.session_id, "session_id": self.session_id,

View File

@ -192,6 +192,124 @@ class SymbolsMixin(GhydraMixinBase):
fields=fields, fields=fields,
) )
@mcp_tool()
def symbols_create(
self,
name: str,
address: str,
port: Optional[int] = None,
) -> Dict[str, Any]:
"""Create a new label/symbol at the specified address.
Args:
name: Name for the new symbol
address: Memory address in hex format
port: Ghidra instance port (optional)
Returns:
Created symbol information
"""
if not name:
return {
"success": False,
"error": {
"code": "MISSING_PARAMETER",
"message": "name parameter is required",
},
}
if not address:
return {
"success": False,
"error": {
"code": "MISSING_PARAMETER",
"message": "address parameter is required",
},
}
try:
port = self.get_instance_port(port)
except ValueError as e:
return {"success": False, "error": {"code": "NO_INSTANCE", "message": str(e)}}
payload = {"name": name, "address": address}
response = self.safe_post(port, "symbols", payload)
return self.simplify_response(response)
@mcp_tool()
def symbols_rename(
self,
address: str,
new_name: str,
port: Optional[int] = None,
) -> Dict[str, Any]:
"""Rename the primary symbol at the specified address.
Args:
address: Memory address of the symbol in hex format
new_name: New name for the symbol
port: Ghidra instance port (optional)
Returns:
Operation result with old and new names
"""
if not address:
return {
"success": False,
"error": {
"code": "MISSING_PARAMETER",
"message": "address parameter is required",
},
}
if not new_name:
return {
"success": False,
"error": {
"code": "MISSING_PARAMETER",
"message": "new_name parameter is required",
},
}
try:
port = self.get_instance_port(port)
except ValueError as e:
return {"success": False, "error": {"code": "NO_INSTANCE", "message": str(e)}}
payload = {"name": new_name}
response = self.safe_patch(port, f"symbols/{address}", payload)
return self.simplify_response(response)
@mcp_tool()
def symbols_delete(
self,
address: str,
port: Optional[int] = None,
) -> Dict[str, Any]:
"""Delete the primary symbol at the specified address.
Args:
address: Memory address of the symbol in hex format
port: Ghidra instance port (optional)
Returns:
Operation result
"""
if not address:
return {
"success": False,
"error": {
"code": "MISSING_PARAMETER",
"message": "address parameter is required",
},
}
try:
port = self.get_instance_port(port)
except ValueError as e:
return {"success": False, "error": {"code": "NO_INSTANCE", "message": str(e)}}
response = self.safe_delete(port, f"symbols/{address}")
return self.simplify_response(response)
# Resources # Resources
@mcp_resource(uri="ghidra://instance/{port}/symbols") @mcp_resource(uri="ghidra://instance/{port}/symbols")

View File

@ -160,6 +160,67 @@ class VariablesMixin(GhydraMixinBase):
fields=fields, fields=fields,
) )
@mcp_tool()
def variables_rename(
self,
function_address: str,
variable_name: str,
new_name: str,
new_type: Optional[str] = None,
port: Optional[int] = None,
) -> Dict[str, Any]:
"""Rename a variable (and optionally retype) within a function.
Args:
function_address: Function address in hex format
variable_name: Current name of the variable
new_name: New name for the variable
new_type: New data type (optional, e.g. "int", "char*")
port: Ghidra instance port (optional)
Returns:
Operation result
"""
if not function_address:
return {
"success": False,
"error": {
"code": "MISSING_PARAMETER",
"message": "function_address parameter is required",
},
}
if not variable_name:
return {
"success": False,
"error": {
"code": "MISSING_PARAMETER",
"message": "variable_name parameter is required",
},
}
if not new_name:
return {
"success": False,
"error": {
"code": "MISSING_PARAMETER",
"message": "new_name parameter is required",
},
}
try:
port = self.get_instance_port(port)
except ValueError as e:
return {"success": False, "error": {"code": "NO_INSTANCE", "message": str(e)}}
from urllib.parse import quote
payload: dict = {"name": new_name}
if new_type:
payload["data_type"] = new_type
endpoint = f"functions/{function_address}/variables/{quote(variable_name)}"
response = self.safe_patch(port, endpoint, payload)
return self.simplify_response(response)
# Resources # Resources
@mcp_resource(uri="ghidra://instance/{port}/variables") @mcp_resource(uri="ghidra://instance/{port}/variables")

View File

@ -16,8 +16,10 @@ from fastmcp import FastMCP
from .config import GhydraConfig, get_config, set_config from .config import GhydraConfig, get_config, set_config
from .mixins import ( from .mixins import (
AnalysisMixin, AnalysisMixin,
BookmarksMixin,
CursorsMixin, CursorsMixin,
DataMixin, DataMixin,
DataTypesMixin,
DockerMixin, DockerMixin,
FunctionsMixin, FunctionsMixin,
InstancesMixin, InstancesMixin,
@ -64,6 +66,8 @@ def create_server(
segments_mixin = SegmentsMixin() segments_mixin = SegmentsMixin()
variables_mixin = VariablesMixin() variables_mixin = VariablesMixin()
namespaces_mixin = NamespacesMixin() namespaces_mixin = NamespacesMixin()
bookmarks_mixin = BookmarksMixin()
datatypes_mixin = DataTypesMixin()
# Register all mixins with the server # Register all mixins with the server
# Each mixin registers its tools, resources, and prompts # Each mixin registers its tools, resources, and prompts
@ -80,6 +84,8 @@ def create_server(
segments_mixin.register_all(mcp) segments_mixin.register_all(mcp)
variables_mixin.register_all(mcp) variables_mixin.register_all(mcp)
namespaces_mixin.register_all(mcp) namespaces_mixin.register_all(mcp)
bookmarks_mixin.register_all(mcp)
datatypes_mixin.register_all(mcp)
# Optional feedback collection # Optional feedback collection
cfg = get_config() cfg = get_config()