Merge feat/api-gap-fill: Session isolation, non-blocking I/O, CRUD operations
Some checks are pending
Build Ghidra Plugin / build (push) Waiting to run

Sprint 3+4 API gap filling plus critical stability fixes:

Features:
- Symbol CRUD (create, rename, delete)
- Bookmark management (list, create, delete)
- Enum/typedef creation
- Variable rename with type resolution

Stability fixes:
- Lazy instances_use (no blocking HTTP calls)
- Non-blocking health checks via thread executor
- Session isolation for docker_stop/cleanup
- Auto port allocation (removed client-specified ports)
- wait=False default for docker_auto_start
This commit is contained in:
Ryan Malloy 2026-02-05 09:27:26 -07:00
commit 7eefdda9f8
26 changed files with 2331 additions and 157 deletions

1
binaries/.gitkeep Normal file
View File

@ -0,0 +1 @@
# Add binaries here for analysis

View File

@ -305,6 +305,7 @@ ROUTES = [
("GET", r"^/strings$", "handle_strings"),
# Memory
("GET", r"^/memory/([^/]+)/comments/([^/]+)$", "handle_get_comment"),
("POST", r"^/memory/([^/]+)/comments/([^/]+)$", "handle_set_comment"),
("GET", r"^/memory/blocks$", "handle_memory_blocks"),
("GET", r"^/memory$", "handle_memory_read"),
@ -316,8 +317,26 @@ ROUTES = [
# Symbols
("GET", r"^/symbols/imports$", "handle_imports"),
("GET", r"^/symbols/exports$", "handle_exports"),
("POST", r"^/symbols$", "handle_symbol_create"),
("PATCH", r"^/symbols/([^/]+)$", "handle_symbol_rename"),
("DELETE", r"^/symbols/([^/]+)$", "handle_symbol_delete"),
("GET", r"^/symbols$", "handle_symbols"),
# Variables
("PATCH", r"^/functions/([^/]+)/variables/([^/]+)$", "handle_variable_rename"),
("GET", r"^/variables$", "handle_variables"),
# Bookmarks
("POST", r"^/bookmarks$", "handle_bookmark_create"),
("DELETE", r"^/bookmarks/([^/]+)$", "handle_bookmark_delete"),
("GET", r"^/bookmarks$", "handle_bookmarks"),
# Data types
("POST", r"^/datatypes/enums$", "handle_enum_create"),
("GET", r"^/datatypes/enums$", "handle_enums"),
("POST", r"^/datatypes/typedefs$", "handle_typedef_create"),
("GET", r"^/datatypes/typedefs$", "handle_typedefs"),
# Cross-references
("GET", r"^/xrefs$", "handle_xrefs"),
@ -734,26 +753,62 @@ class GhydraMCPHandler(HttpHandler):
limit = parse_int(params.get("limit"), 100)
offset = parse_int(params.get("offset"), 0)
name_filter = params.get("name")
name_contains = params.get("name_contains")
name_regex = params.get("name_matches_regex")
addr_filter = params.get("addr")
grep_pattern = compile_grep(params)
functions = []
# Compile name regex if provided
name_regex_pat = None
if name_regex:
try:
name_regex_pat = re.compile(name_regex, re.IGNORECASE)
except:
return {"success": False, "error": {"code": "INVALID_REGEX", "message": "Invalid regex: %s" % name_regex}}
fm = self.program.getFunctionManager()
total = fm.getFunctionCount()
# Single function lookup by address
if addr_filter:
func = self._find_function_at(addr_filter)
if not func:
return {"success": True, "result": [], "size": 0, "offset": 0, "limit": limit}
addr = str(func.getEntryPoint())
return {"success": True, "result": [{
"name": func.getName(),
"address": addr,
"signature": str(func.getSignature()),
"parameterCount": func.getParameterCount(),
"isThunk": func.isThunk(),
"_links": {
"self": make_link("/functions/%s" % addr),
"decompile": make_link("/functions/%s/decompile" % addr),
"disassembly": make_link("/functions/%s/disassembly" % addr),
},
}], "size": 1, "offset": 0, "limit": limit}
functions = []
count = 0
skipped = 0
for func in fm.getFunctions(True):
if count >= limit:
break
# Apply name filter
if name_filter and name_filter.lower() not in func.getName().lower():
func_name = func.getName()
# Apply name filters
if name_filter and name_filter.lower() not in func_name.lower():
continue
if name_contains and name_contains.lower() not in func_name.lower():
continue
if name_regex_pat and not name_regex_pat.search(func_name):
continue
if skipped < offset:
skipped += 1
continue
addr = str(func.getEntryPoint())
item = {
"name": func.getName(),
"name": func_name,
"address": addr,
"signature": str(func.getSignature()),
"parameterCount": func.getParameterCount(),
@ -1358,6 +1413,36 @@ class GhydraMCPHandler(HttpHandler):
"""List memory blocks (alias for /segments)."""
return self.handle_segments(exchange)
def handle_get_comment(self, exchange, addr_str, comment_type):
"""Get a comment at a specific address."""
if not self.program:
return self._no_program()
ct = COMMENT_TYPE_MAP.get(comment_type.lower())
if ct is None:
return {"success": False, "error": {
"code": "INVALID_COMMENT_TYPE",
"message": "Invalid comment type: %s. Use: pre, post, eol, plate, repeatable" % comment_type
}}
try:
addr = self.program.getAddressFactory().getAddress(addr_str)
listing = self.program.getListing()
cu = listing.getCodeUnitAt(addr)
if not cu:
cu = listing.getCodeUnitContaining(addr)
if not cu:
return {"success": True, "result": {"address": addr_str, "commentType": comment_type, "comment": None}}
comment = cu.getComment(ct)
return {"success": True, "result": {
"address": addr_str,
"commentType": comment_type,
"comment": comment,
}}
except Exception as e:
return {"success": False, "error": {"code": "COMMENT_ERROR", "message": str(e)}}
def handle_set_comment(self, exchange, addr_str, comment_type):
"""Set a comment at a specific address."""
if not self.program:
@ -2112,6 +2197,518 @@ class GhydraMCPHandler(HttpHandler):
except Exception as e:
return {"success": False, "error": {"code": "DATAFLOW_ERROR", "message": str(e)}}
# ==================================================================
# Symbol CRUD Handlers
# ==================================================================
def handle_symbol_create(self, exchange):
"""POST /symbols - Create a new label/symbol."""
if not self.program:
return self._no_program()
body = parse_json_body(exchange)
name = body.get("name", "")
address = body.get("address", "")
if not name or not address:
return {"success": False, "error": {"code": "MISSING_PARAMETER", "message": "Both 'name' and 'address' are required"}}
try:
addr = self.program.getAddressFactory().getAddress(address)
st = self.program.getSymbolTable()
def do_create():
st.createLabel(addr, name, SourceType.USER_DEFINED)
with_transaction(self.program, "Create symbol", do_create)
return {"success": True, "result": {
"name": name,
"address": address,
"message": "Symbol created successfully",
}}
except Exception as e:
return {"success": False, "error": {"code": "SYMBOL_ERROR", "message": str(e)}}
def handle_symbol_rename(self, exchange, addr_str):
"""PATCH /symbols/{address} - Rename primary symbol at address."""
if not self.program:
return self._no_program()
body = parse_json_body(exchange)
new_name = body.get("name", "")
if not new_name:
return {"success": False, "error": {"code": "MISSING_PARAMETER", "message": "'name' parameter is required"}}
try:
addr = self.program.getAddressFactory().getAddress(addr_str)
st = self.program.getSymbolTable()
symbol = st.getPrimarySymbol(addr)
if not symbol:
return {"success": False, "error": {"code": "NOT_FOUND", "message": "No symbol at address: %s" % addr_str}}
old_name = symbol.getName()
def do_rename():
symbol.setName(new_name, SourceType.USER_DEFINED)
with_transaction(self.program, "Rename symbol", do_rename)
return {"success": True, "result": {
"address": addr_str,
"oldName": old_name,
"newName": new_name,
"message": "Symbol renamed successfully",
}}
except Exception as e:
return {"success": False, "error": {"code": "SYMBOL_ERROR", "message": str(e)}}
def handle_symbol_delete(self, exchange, addr_str):
"""DELETE /symbols/{address} - Delete primary symbol at address."""
if not self.program:
return self._no_program()
try:
addr = self.program.getAddressFactory().getAddress(addr_str)
st = self.program.getSymbolTable()
symbol = st.getPrimarySymbol(addr)
if not symbol:
return {"success": False, "error": {"code": "NOT_FOUND", "message": "No symbol at address: %s" % addr_str}}
name = symbol.getName()
def do_delete():
symbol.delete()
with_transaction(self.program, "Delete symbol", do_delete)
return {"success": True, "result": {
"address": addr_str,
"name": name,
"message": "Symbol deleted successfully",
}}
except Exception as e:
return {"success": False, "error": {"code": "SYMBOL_ERROR", "message": str(e)}}
# ==================================================================
# Variable Rename Handler
# ==================================================================
def handle_variable_rename(self, exchange, addr_str, var_name):
"""PATCH /functions/{address}/variables/{name} - Rename/retype a variable."""
if not self.program:
return self._no_program()
body = parse_json_body(exchange)
new_name = body.get("name", "")
new_type = body.get("data_type")
if not new_name:
return {"success": False, "error": {"code": "MISSING_PARAMETER", "message": "'name' parameter is required"}}
try:
# URL-decode the variable name
from java.net import URLDecoder
decoded_name = URLDecoder.decode(var_name, "UTF-8")
func = self._find_function_at(addr_str)
if not func:
return {"success": False, "error": {"code": "FUNCTION_NOT_FOUND", "message": "No function at address: %s" % addr_str}}
# Search parameters and local variables
target_var = None
for param in func.getParameters():
if param.getName() == decoded_name:
target_var = param
break
if not target_var:
for var in func.getAllVariables():
if var.getName() == decoded_name:
target_var = var
break
if not target_var:
return {"success": False, "error": {"code": "NOT_FOUND", "message": "Variable '%s' not found in function" % decoded_name}}
old_name = target_var.getName()
def do_rename():
target_var.setName(new_name, SourceType.USER_DEFINED)
if new_type:
dtm = self.program.getDataTypeManager()
dt = resolve_data_type(dtm, new_type)
if dt:
target_var.setDataType(dt, SourceType.USER_DEFINED)
with_transaction(self.program, "Rename variable", do_rename)
return {"success": True, "result": {
"function": addr_str,
"oldName": old_name,
"newName": new_name,
"message": "Variable renamed successfully",
}}
except Exception as e:
return {"success": False, "error": {"code": "VARIABLE_ERROR", "message": str(e)}}
# ==================================================================
# Variables Handler
# ==================================================================
def handle_variables(self, exchange):
"""GET /variables - List global and function variables."""
if not self.program:
return self._no_program()
params = parse_query_params(exchange)
limit = parse_int(params.get("limit"), 100)
offset = parse_int(params.get("offset"), 0)
global_only = params.get("global_only", "false").lower() == "true"
search = params.get("search", "")
grep_pattern = compile_grep(params)
variables = []
count = 0
skipped = 0
# Function variables (parameters + locals)
if not global_only:
fm = self.program.getFunctionManager()
for func in fm.getFunctions(True):
if count >= limit:
break
func_name = func.getName()
func_addr = str(func.getEntryPoint())
for param in func.getParameters():
if count >= limit:
break
var_name = param.getName()
if search and search.lower() not in var_name.lower():
continue
if skipped < offset:
skipped += 1
continue
item = {
"name": var_name,
"type": str(param.getDataType()),
"storage": str(param.getVariableStorage()),
"scope": "parameter",
"function": func_name,
"functionAddress": func_addr,
}
if not grep_matches_item(item, grep_pattern):
continue
variables.append(item)
count += 1
for var in func.getLocalVariables():
if count >= limit:
break
var_name = var.getName()
if search and search.lower() not in var_name.lower():
continue
if skipped < offset:
skipped += 1
continue
item = {
"name": var_name,
"type": str(var.getDataType()),
"storage": str(var.getVariableStorage()),
"scope": "local",
"function": func_name,
"functionAddress": func_addr,
}
if not grep_matches_item(item, grep_pattern):
continue
variables.append(item)
count += 1
# Global variables (defined data with symbol names)
listing = self.program.getListing()
st = self.program.getSymbolTable()
for data in listing.getDefinedData(True):
if count >= limit:
break
addr = data.getAddress()
symbol = st.getPrimarySymbol(addr)
if not symbol:
continue
sym_name = symbol.getName()
# Skip auto-generated names
if sym_name.startswith("DAT_") or sym_name.startswith("s_"):
continue
if search and search.lower() not in sym_name.lower():
continue
if skipped < offset:
skipped += 1
continue
item = {
"name": sym_name,
"address": str(addr),
"type": str(data.getDataType()),
"scope": "global",
"size": data.getLength(),
}
if not grep_matches_item(item, grep_pattern):
continue
variables.append(item)
count += 1
return {"success": True, "result": variables, "offset": offset, "limit": limit}
# ==================================================================
# Bookmarks Handlers
# ==================================================================
def handle_bookmarks(self, exchange):
"""GET /bookmarks - List bookmarks with optional filtering."""
if not self.program:
return self._no_program()
params = parse_query_params(exchange)
limit = parse_int(params.get("limit"), 100)
offset = parse_int(params.get("offset"), 0)
type_filter = params.get("type")
category_filter = params.get("category")
grep_pattern = compile_grep(params)
bm = self.program.getBookmarkManager()
bookmarks = []
count = 0
skipped = 0
# Get bookmark types to iterate
if type_filter:
bm_types = [type_filter]
else:
bm_types = [str(t) for t in bm.getBookmarkTypes()]
for btype in bm_types:
if count >= limit:
break
try:
it = bm.getBookmarksIterator(btype)
except:
continue
while it.hasNext() and count < limit:
bookmark = it.next()
if category_filter and bookmark.getCategory() != category_filter:
continue
if skipped < offset:
skipped += 1
continue
item = {
"address": str(bookmark.getAddress()),
"type": bookmark.getTypeString(),
"category": bookmark.getCategory(),
"comment": bookmark.getComment(),
}
if not grep_matches_item(item, grep_pattern):
continue
bookmarks.append(item)
count += 1
return {"success": True, "result": bookmarks, "offset": offset, "limit": limit}
def handle_bookmark_create(self, exchange):
"""POST /bookmarks - Create a bookmark."""
if not self.program:
return self._no_program()
body = parse_json_body(exchange)
address = body.get("address", "")
btype = body.get("type", "Note")
category = body.get("category", "")
comment = body.get("comment", "")
if not address:
return {"success": False, "error": {"code": "MISSING_PARAMETER", "message": "'address' is required"}}
try:
addr = self.program.getAddressFactory().getAddress(address)
bm = self.program.getBookmarkManager()
def do_create():
bm.setBookmark(addr, btype, category, comment)
with_transaction(self.program, "Create bookmark", do_create)
return {"success": True, "result": {
"address": address,
"type": btype,
"category": category,
"comment": comment,
"message": "Bookmark created successfully",
}}
except Exception as e:
return {"success": False, "error": {"code": "BOOKMARK_ERROR", "message": str(e)}}
def handle_bookmark_delete(self, exchange, addr_str):
"""DELETE /bookmarks/{address} - Delete all bookmarks at address."""
if not self.program:
return self._no_program()
try:
addr = self.program.getAddressFactory().getAddress(addr_str)
bm = self.program.getBookmarkManager()
removed = []
def do_delete():
for bookmark in list(bm.getBookmarks(addr)):
removed.append(bookmark.getTypeString())
bookmark.remove() if hasattr(bookmark, 'remove') else bm.removeBookmark(bookmark)
with_transaction(self.program, "Delete bookmarks", do_delete)
return {"success": True, "result": {
"address": addr_str,
"removedTypes": removed,
"count": len(removed),
"message": "Bookmarks deleted successfully",
}}
except Exception as e:
return {"success": False, "error": {"code": "BOOKMARK_ERROR", "message": str(e)}}
# ==================================================================
# Enum Handlers
# ==================================================================
def handle_enums(self, exchange):
"""GET /datatypes/enums - List enum data types."""
if not self.program:
return self._no_program()
params = parse_query_params(exchange)
limit = parse_int(params.get("limit"), 100)
offset = parse_int(params.get("offset"), 0)
grep_pattern = compile_grep(params)
from ghidra.program.model.data import Enum as GhidraEnum
dtm = self.program.getDataTypeManager()
enums = []
count = 0
skipped = 0
for dt in dtm.getAllDataTypes():
if count >= limit:
break
if not isinstance(dt, GhidraEnum):
continue
if skipped < offset:
skipped += 1
continue
# Get enum members
members = []
for name in dt.getNames():
members.append({"name": name, "value": dt.getValue(name)})
item = {
"name": dt.getName(),
"category": str(dt.getCategoryPath()),
"size": dt.getLength(),
"members": members,
}
if not grep_matches_item(item, grep_pattern):
continue
enums.append(item)
count += 1
return {"success": True, "result": enums, "offset": offset, "limit": limit}
def handle_enum_create(self, exchange):
"""POST /datatypes/enums - Create a new enum."""
if not self.program:
return self._no_program()
body = parse_json_body(exchange)
name = body.get("name", "")
size = int(body.get("size", 4))
if not name:
return {"success": False, "error": {"code": "MISSING_PARAMETER", "message": "'name' is required"}}
try:
from ghidra.program.model.data import EnumDataType, CategoryPath
dtm = self.program.getDataTypeManager()
new_enum = EnumDataType(name, size)
def do_create():
dtm.addDataType(new_enum, None)
with_transaction(self.program, "Create enum", do_create)
return {"success": True, "result": {
"name": name,
"size": size,
"message": "Enum created successfully",
}}
except Exception as e:
return {"success": False, "error": {"code": "ENUM_ERROR", "message": str(e)}}
# ==================================================================
# Typedef Handlers
# ==================================================================
def handle_typedefs(self, exchange):
"""GET /datatypes/typedefs - List typedef data types."""
if not self.program:
return self._no_program()
params = parse_query_params(exchange)
limit = parse_int(params.get("limit"), 100)
offset = parse_int(params.get("offset"), 0)
grep_pattern = compile_grep(params)
from ghidra.program.model.data import TypeDef
dtm = self.program.getDataTypeManager()
typedefs = []
count = 0
skipped = 0
for dt in dtm.getAllDataTypes():
if count >= limit:
break
if not isinstance(dt, TypeDef):
continue
if skipped < offset:
skipped += 1
continue
item = {
"name": dt.getName(),
"category": str(dt.getCategoryPath()),
"baseType": dt.getBaseDataType().getName() if dt.getBaseDataType() else None,
"size": dt.getLength(),
}
if not grep_matches_item(item, grep_pattern):
continue
typedefs.append(item)
count += 1
return {"success": True, "result": typedefs, "offset": offset, "limit": limit}
def handle_typedef_create(self, exchange):
"""POST /datatypes/typedefs - Create a new typedef."""
if not self.program:
return self._no_program()
body = parse_json_body(exchange)
name = body.get("name", "")
base_type_name = body.get("base_type", "")
if not name or not base_type_name:
return {"success": False, "error": {"code": "MISSING_PARAMETER", "message": "'name' and 'base_type' are required"}}
try:
from ghidra.program.model.data import TypedefDataType
dtm = self.program.getDataTypeManager()
# Use the shared resolver which handles builtins + path lookups
base_dt = resolve_data_type(dtm, base_type_name)
if not base_dt:
return {"success": False, "error": {"code": "NOT_FOUND", "message": "Base type not found: %s" % base_type_name}}
new_typedef = TypedefDataType(name, base_dt)
def do_create():
dtm.addDataType(new_typedef, None)
with_transaction(self.program, "Create typedef", do_create)
return {"success": True, "result": {
"name": name,
"baseType": base_type_name,
"message": "Typedef created successfully",
}}
except Exception as e:
return {"success": False, "error": {"code": "TYPEDEF_ERROR", "message": str(e)}}
# ==================================================================
# Legacy Compatibility
# ==================================================================

View File

@ -97,6 +97,14 @@ class GhydraConfig:
"data": 1000,
"structs": 500,
"xrefs": 500,
"symbols": 1000,
"segments": 500,
"variables": 1000,
"namespaces": 500,
"classes": 500,
"bookmarks": 1000,
"enums": 500,
"typedefs": 500,
})
def __post_init__(self):

View File

@ -3,38 +3,38 @@
Contains HTTP client, pagination, progress reporting, and logging utilities.
"""
from .filtering import (
apply_grep,
estimate_and_guard,
project_fields,
)
from .http_client import (
get_instance_url,
safe_delete,
safe_get,
safe_patch,
safe_post,
safe_put,
safe_patch,
safe_delete,
simplify_response,
get_instance_url,
)
from .logging import (
log_debug,
log_error,
log_info,
log_warning,
)
from .pagination import (
CursorManager,
CursorState,
paginate_response,
get_cursor_manager,
estimate_tokens,
get_cursor_manager,
paginate_response,
)
from .progress import (
ProgressReporter,
report_progress,
report_step,
)
from .filtering import (
project_fields,
apply_grep,
estimate_and_guard,
)
from .logging import (
log_info,
log_debug,
log_warning,
log_error,
)
__all__ = [
# HTTP client

View File

@ -11,7 +11,6 @@ from typing import Any, Dict, Optional
from ..config import get_config
# Token estimation (same ratio as pagination.py)
TOKEN_ESTIMATION_RATIO = 4.0

View File

@ -12,7 +12,6 @@ import requests
from ..config import get_config
# Allowed origins for CORS-like validation
ALLOWED_ORIGINS = {
"http://localhost",

View File

@ -5,7 +5,7 @@ client-visible logging when available, with fallback to standard logging.
"""
import logging
from typing import Optional, TYPE_CHECKING
from typing import TYPE_CHECKING, Optional
if TYPE_CHECKING:
from mcp.server.fastmcp import Context

View File

@ -14,8 +14,7 @@ from threading import Lock
from typing import Any, Dict, List, Optional, Tuple
from ..config import get_config
from .filtering import project_fields, estimate_and_guard
from .filtering import estimate_and_guard, project_fields
# ReDoS Protection Configuration
MAX_GREP_PATTERN_LENGTH = 500

View File

@ -4,7 +4,7 @@ Provides async progress reporting using FastMCP's Context for
real-time progress notifications to MCP clients.
"""
from typing import Optional, TYPE_CHECKING
from typing import TYPE_CHECKING, Optional
if TYPE_CHECKING:
from mcp.server.fastmcp import Context

View File

@ -4,16 +4,22 @@ Domain-specific mixins that organize tools, resources, and prompts by functional
Uses FastMCP's contrib.mcp_mixin pattern for clean modular organization.
"""
from .base import GhydraMixinBase
from .instances import InstancesMixin
from .functions import FunctionsMixin
from .data import DataMixin
from .structs import StructsMixin
from .analysis import AnalysisMixin
from .memory import MemoryMixin
from .xrefs import XrefsMixin
from .base import GhydraMixinBase
from .bookmarks import BookmarksMixin
from .cursors import CursorsMixin
from .data import DataMixin
from .datatypes import DataTypesMixin
from .docker import DockerMixin
from .functions import FunctionsMixin
from .instances import InstancesMixin
from .memory import MemoryMixin
from .namespaces import NamespacesMixin
from .segments import SegmentsMixin
from .structs import StructsMixin
from .symbols import SymbolsMixin
from .variables import VariablesMixin
from .xrefs import XrefsMixin
__all__ = [
"GhydraMixinBase",
@ -26,4 +32,10 @@ __all__ = [
"XrefsMixin",
"CursorsMixin",
"DockerMixin",
"SymbolsMixin",
"SegmentsMixin",
"VariablesMixin",
"NamespacesMixin",
"BookmarksMixin",
"DataTypesMixin",
]

View File

@ -8,8 +8,8 @@ from typing import Any, Dict, List, Optional
from fastmcp import Context
from fastmcp.contrib.mcp_mixin import mcp_tool
from .base import GhydraMixinBase
from ..config import get_config
from .base import GhydraMixinBase
class AnalysisMixin(GhydraMixinBase):
@ -277,6 +277,40 @@ class AnalysisMixin(GhydraMixinBase):
response = self.safe_get(port, "function")
return self.simplify_response(response)
@mcp_tool()
def comments_get(
self,
address: str,
comment_type: str = "plate",
port: Optional[int] = None,
) -> Dict[str, Any]:
"""Get a comment at the specified address.
Args:
address: Memory address in hex format
comment_type: "plate", "pre", "post", "eol", "repeatable"
port: Ghidra instance port (optional)
Returns:
Comment text and metadata
"""
if not address:
return {
"success": False,
"error": {
"code": "MISSING_PARAMETER",
"message": "Address parameter is required",
},
}
try:
port = self.get_instance_port(port)
except ValueError as e:
return {"success": False, "error": {"code": "NO_INSTANCE", "message": str(e)}}
response = self.safe_get(port, f"memory/{address}/comments/{comment_type}")
return self.simplify_response(response)
@mcp_tool()
def comments_set(
self,

View File

@ -11,9 +11,16 @@ from fastmcp import Context
from fastmcp.contrib.mcp_mixin import MCPMixin
from ..config import get_config
from ..core.http_client import safe_get, safe_post, safe_put, safe_patch, safe_delete, simplify_response
from ..core.http_client import (
safe_delete,
safe_get,
safe_patch,
safe_post,
safe_put,
simplify_response,
)
from ..core.logging import log_debug, log_error, log_info, log_warning
from ..core.pagination import paginate_response
from ..core.logging import log_info, log_debug, log_warning, log_error
class GhydraMixinBase(MCPMixin):

View File

@ -0,0 +1,171 @@
"""Bookmarks mixin for GhydraMCP.
Provides tools for managing Ghidra bookmarks (annotations at addresses).
"""
from typing import Any, Dict, List, Optional
from fastmcp import Context
from fastmcp.contrib.mcp_mixin import mcp_tool
from ..config import get_config
from .base import GhydraMixinBase
class BookmarksMixin(GhydraMixinBase):
"""Mixin for bookmark operations.
Provides tools for:
- Listing bookmarks with type/category filtering
- Creating bookmarks at addresses
- Deleting bookmarks
"""
@mcp_tool()
def bookmarks_list(
self,
type: Optional[str] = None,
category: Optional[str] = None,
port: Optional[int] = None,
page_size: int = 50,
grep: Optional[str] = None,
grep_ignorecase: bool = True,
return_all: bool = False,
fields: Optional[List[str]] = None,
ctx: Optional[Context] = None,
) -> Dict[str, Any]:
"""List bookmarks with optional type/category filtering.
Args:
type: Filter by bookmark type (e.g. "Note", "Warning", "Error", "Info")
category: Filter by bookmark category
port: Ghidra instance port (optional)
page_size: Bookmarks per page (default: 50, max: 500)
grep: Regex pattern to filter bookmark comments
grep_ignorecase: Case-insensitive grep (default: True)
return_all: Return all bookmarks without pagination
fields: Field names to keep (e.g. ['address', 'type', 'comment']). Reduces response size.
ctx: FastMCP context (auto-injected)
Returns:
Paginated list of bookmarks
"""
try:
port = self.get_instance_port(port)
except ValueError as e:
return {"success": False, "error": {"code": "NO_INSTANCE", "message": str(e)}}
config = get_config()
cap = config.resource_caps.get("bookmarks", 1000)
params: Dict[str, Any] = {"limit": cap}
if type:
params["type"] = type
if category:
params["category"] = category
response = self.safe_get(port, "bookmarks", params)
simplified = self.simplify_response(response)
if not simplified.get("success", True):
return simplified
bookmarks = simplified.get("result", [])
if not isinstance(bookmarks, list):
bookmarks = []
query_params = {
"tool": "bookmarks_list",
"port": port,
"type": type,
"category": category,
"grep": grep,
}
session_id = self._get_session_id(ctx)
return self.filtered_paginate(
data=bookmarks,
query_params=query_params,
tool_name="bookmarks_list",
session_id=session_id,
page_size=min(page_size, config.max_page_size),
grep=grep,
grep_ignorecase=grep_ignorecase,
return_all=return_all,
fields=fields,
)
@mcp_tool()
def bookmarks_create(
self,
address: str,
type: str = "Note",
category: str = "",
comment: str = "",
port: Optional[int] = None,
) -> Dict[str, Any]:
"""Create a bookmark at the specified address.
Args:
address: Memory address in hex format
type: Bookmark type (default: "Note"). Common types: Note, Warning, Error, Info
category: Bookmark category (optional grouping string)
comment: Bookmark comment text
port: Ghidra instance port (optional)
Returns:
Created bookmark information
"""
if not address:
return {
"success": False,
"error": {
"code": "MISSING_PARAMETER",
"message": "address parameter is required",
},
}
try:
port = self.get_instance_port(port)
except ValueError as e:
return {"success": False, "error": {"code": "NO_INSTANCE", "message": str(e)}}
payload = {
"address": address,
"type": type,
"category": category,
"comment": comment,
}
response = self.safe_post(port, "bookmarks", payload)
return self.simplify_response(response)
@mcp_tool()
def bookmarks_delete(
self,
address: str,
port: Optional[int] = None,
) -> Dict[str, Any]:
"""Delete all bookmarks at the specified address.
Args:
address: Memory address in hex format
port: Ghidra instance port (optional)
Returns:
Operation result
"""
if not address:
return {
"success": False,
"error": {
"code": "MISSING_PARAMETER",
"message": "address parameter is required",
},
}
try:
port = self.get_instance_port(port)
except ValueError as e:
return {"success": False, "error": {"code": "NO_INSTANCE", "message": str(e)}}
response = self.safe_delete(port, f"bookmarks/{address}")
return self.simplify_response(response)

View File

@ -8,8 +8,8 @@ from typing import Any, Dict, Optional
from fastmcp import Context
from fastmcp.contrib.mcp_mixin import mcp_tool
from .base import GhydraMixinBase
from ..core.pagination import get_cursor_manager
from .base import GhydraMixinBase
class CursorsMixin(GhydraMixinBase):

View File

@ -6,10 +6,10 @@ Provides tools for data items and strings operations.
from typing import Any, Dict, List, Optional
from fastmcp import Context
from fastmcp.contrib.mcp_mixin import mcp_tool, mcp_resource
from fastmcp.contrib.mcp_mixin import mcp_resource, mcp_tool
from .base import GhydraMixinBase
from ..config import get_config
from .base import GhydraMixinBase
class DataMixin(GhydraMixinBase):

View File

@ -0,0 +1,217 @@
"""Data types mixin for GhydraMCP.
Provides tools for managing enum and typedef data types.
"""
from typing import Any, Dict, List, Optional
from fastmcp import Context
from fastmcp.contrib.mcp_mixin import mcp_tool
from ..config import get_config
from .base import GhydraMixinBase
class DataTypesMixin(GhydraMixinBase):
"""Mixin for enum and typedef data type operations.
Provides tools for:
- Listing and creating enum data types
- Listing and creating typedef data types
"""
# --- Enums ---
@mcp_tool()
def enums_list(
self,
port: Optional[int] = None,
page_size: int = 50,
grep: Optional[str] = None,
grep_ignorecase: bool = True,
return_all: bool = False,
fields: Optional[List[str]] = None,
ctx: Optional[Context] = None,
) -> Dict[str, Any]:
"""List enum data types with their members.
Args:
port: Ghidra instance port (optional)
page_size: Enums per page (default: 50, max: 500)
grep: Regex pattern to filter enum names
grep_ignorecase: Case-insensitive grep (default: True)
return_all: Return all enums without pagination
fields: Field names to keep (e.g. ['name', 'size', 'members']). Reduces response size.
ctx: FastMCP context (auto-injected)
Returns:
Paginated list of enum data types
"""
try:
port = self.get_instance_port(port)
except ValueError as e:
return {"success": False, "error": {"code": "NO_INSTANCE", "message": str(e)}}
config = get_config()
cap = config.resource_caps.get("enums", 500)
response = self.safe_get(port, "datatypes/enums", {"limit": cap})
simplified = self.simplify_response(response)
if not simplified.get("success", True):
return simplified
enums = simplified.get("result", [])
if not isinstance(enums, list):
enums = []
query_params = {"tool": "enums_list", "port": port, "grep": grep}
session_id = self._get_session_id(ctx)
return self.filtered_paginate(
data=enums,
query_params=query_params,
tool_name="enums_list",
session_id=session_id,
page_size=min(page_size, config.max_page_size),
grep=grep,
grep_ignorecase=grep_ignorecase,
return_all=return_all,
fields=fields,
)
@mcp_tool()
def enums_create(
self,
name: str,
size: int = 4,
port: Optional[int] = None,
) -> Dict[str, Any]:
"""Create a new enum data type.
Args:
name: Name for the new enum
size: Size in bytes (default: 4)
port: Ghidra instance port (optional)
Returns:
Created enum information
"""
if not name:
return {
"success": False,
"error": {
"code": "MISSING_PARAMETER",
"message": "name parameter is required",
},
}
try:
port = self.get_instance_port(port)
except ValueError as e:
return {"success": False, "error": {"code": "NO_INSTANCE", "message": str(e)}}
payload = {"name": name, "size": size}
response = self.safe_post(port, "datatypes/enums", payload)
return self.simplify_response(response)
# --- Typedefs ---
@mcp_tool()
def typedefs_list(
self,
port: Optional[int] = None,
page_size: int = 50,
grep: Optional[str] = None,
grep_ignorecase: bool = True,
return_all: bool = False,
fields: Optional[List[str]] = None,
ctx: Optional[Context] = None,
) -> Dict[str, Any]:
"""List typedef data types.
Args:
port: Ghidra instance port (optional)
page_size: Typedefs per page (default: 50, max: 500)
grep: Regex pattern to filter typedef names
grep_ignorecase: Case-insensitive grep (default: True)
return_all: Return all typedefs without pagination
fields: Field names to keep (e.g. ['name', 'base_type']). Reduces response size.
ctx: FastMCP context (auto-injected)
Returns:
Paginated list of typedef data types
"""
try:
port = self.get_instance_port(port)
except ValueError as e:
return {"success": False, "error": {"code": "NO_INSTANCE", "message": str(e)}}
config = get_config()
cap = config.resource_caps.get("typedefs", 500)
response = self.safe_get(port, "datatypes/typedefs", {"limit": cap})
simplified = self.simplify_response(response)
if not simplified.get("success", True):
return simplified
typedefs = simplified.get("result", [])
if not isinstance(typedefs, list):
typedefs = []
query_params = {"tool": "typedefs_list", "port": port, "grep": grep}
session_id = self._get_session_id(ctx)
return self.filtered_paginate(
data=typedefs,
query_params=query_params,
tool_name="typedefs_list",
session_id=session_id,
page_size=min(page_size, config.max_page_size),
grep=grep,
grep_ignorecase=grep_ignorecase,
return_all=return_all,
fields=fields,
)
@mcp_tool()
def typedefs_create(
self,
name: str,
base_type: str,
port: Optional[int] = None,
) -> Dict[str, Any]:
"""Create a new typedef data type.
Args:
name: Name for the new typedef
base_type: Name of the base data type (e.g. "int", "uint32_t", "char*")
port: Ghidra instance port (optional)
Returns:
Created typedef information
"""
if not name:
return {
"success": False,
"error": {
"code": "MISSING_PARAMETER",
"message": "name parameter is required",
},
}
if not base_type:
return {
"success": False,
"error": {
"code": "MISSING_PARAMETER",
"message": "base_type parameter is required",
},
}
try:
port = self.get_instance_port(port)
except ValueError as e:
return {"success": False, "error": {"code": "NO_INSTANCE", "message": str(e)}}
payload = {"name": name, "base_type": base_type}
response = self.safe_post(port, "datatypes/typedefs", payload)
return self.simplify_response(response)

View File

@ -16,17 +16,14 @@ import subprocess
import time
import uuid
from pathlib import Path
from typing import Any, Dict, List, Optional, Set
from typing import Any, Dict, List, Optional
from fastmcp import Context
from fastmcp.contrib.mcp_mixin import MCPMixin, mcp_tool
from ..config import get_config, get_docker_config
# Port pool configuration
# Port pool configuration (32 ports should handle many concurrent sessions)
PORT_POOL_START = 8192
PORT_POOL_END = 8199
PORT_POOL_END = 8223
PORT_LOCK_DIR = Path("/tmp/ghydramcp-ports")
@ -207,7 +204,7 @@ class DockerMixin(MCPMixin):
with the GhydraMCP plugin pre-installed.
Supports multi-process environments with:
- Dynamic port allocation from a pool (8192-8199)
- Dynamic port allocation from a pool (8192-8223)
- Session-scoped container naming with UUIDs
- Docker label-based tracking for cross-process visibility
- Automatic cleanup of orphaned containers
@ -513,7 +510,6 @@ class DockerMixin(MCPMixin):
async def docker_start(
self,
binary_path: str,
port: Optional[int] = None,
memory: str = "2G",
name: Optional[str] = None,
ctx: Optional[Context] = None,
@ -522,15 +518,14 @@ class DockerMixin(MCPMixin):
This creates a new Ghidra instance in Docker with the GhydraMCP
plugin pre-installed. The binary will be imported and analyzed,
then the HTTP API will be available on the specified port.
then the HTTP API will be available.
If no port is specified, one will be automatically allocated from
the pool (8192-8199). Container names are auto-generated with the
session ID to ensure uniqueness across processes.
Ports are automatically allocated from the pool (8192-8223) to
prevent conflicts between concurrent sessions. Container names
are auto-generated with the session ID to ensure uniqueness.
Args:
binary_path: Path to the binary file to analyze
port: Port to expose the HTTP API (auto-allocated if not specified)
memory: Max JVM heap memory (default: 2G)
name: Container name (auto-generated if not specified)
@ -545,16 +540,13 @@ class DockerMixin(MCPMixin):
if not binary_file.exists():
return {"error": f"Binary not found: {binary_path}"}
# Allocate port from pool if not specified
allocated_port = False
# Always allocate from pool to prevent conflicts between sessions
port = self._port_pool.allocate(self.session_id)
if port is None:
port = self._port_pool.allocate(self.session_id)
if port is None:
return {
"error": "Port pool exhausted (8192-8199). Stop some containers first.",
"allocated_ports": self._port_pool.get_allocated_ports(),
}
allocated_port = True
return {
"error": "Port pool exhausted (8192-8223). Stop some containers first.",
"allocated_ports": self._port_pool.get_allocated_ports(),
}
# Generate container name if not specified
if name is None:
@ -569,8 +561,7 @@ class DockerMixin(MCPMixin):
["ps", "-a", "-q", "-f", f"name=^{name}$"], check=False
)
if check_result.stdout.strip():
if allocated_port:
self._port_pool.release(port)
self._port_pool.release(port)
return {
"error": f"Container '{name}' already exists. Stop it first with docker_stop."
}
@ -580,8 +571,7 @@ class DockerMixin(MCPMixin):
["ps", "-q", "-f", f"publish={port}"], check=False
)
if port_check.stdout.strip():
if allocated_port:
self._port_pool.release(port)
self._port_pool.release(port)
return {
"error": f"Port {port} is already in use by another container"
}
@ -619,7 +609,6 @@ class DockerMixin(MCPMixin):
"port": port,
"binary": str(binary_file),
"memory": memory,
"allocated_port": allocated_port,
}
return {
@ -638,8 +627,7 @@ class DockerMixin(MCPMixin):
}
except subprocess.CalledProcessError as e:
if allocated_port:
self._port_pool.release(port)
self._port_pool.release(port)
return {"error": f"Failed to start container: {e.stderr or e.stdout}"}
@mcp_tool(
@ -651,6 +639,10 @@ class DockerMixin(MCPMixin):
) -> Dict[str, Any]:
"""Stop a GhydraMCP Docker container.
For safety, this will only stop containers that belong to the current
MCP session. Attempting to stop another session's container will fail
with an error explaining whose container it is.
Args:
name_or_id: Container name or ID
remove: Also remove the container (default: True)
@ -661,18 +653,34 @@ class DockerMixin(MCPMixin):
if not self._check_docker_available():
return {"error": "Docker is not available on this system"}
# Find the container to get its port for pool release
# Get container's session and port labels for validation
container_port = None
container_session = None
try:
inspect_result = self._run_docker_cmd(
["inspect", "--format", "{{index .Config.Labels \"" + self.LABEL_PREFIX + ".port\"}}", name_or_id],
[
"inspect",
"--format",
"{{index .Config.Labels \"" + self.LABEL_PREFIX + ".port\"}}|{{index .Config.Labels \"" + self.LABEL_PREFIX + ".session\"}}",
name_or_id,
],
check=False,
)
if inspect_result.stdout.strip().isdigit():
container_port = int(inspect_result.stdout.strip())
parts = inspect_result.stdout.strip().split("|")
if len(parts) >= 2:
if parts[0].isdigit():
container_port = int(parts[0])
container_session = parts[1] if parts[1] else None
except Exception:
pass
# Session validation: only allow stopping own containers
if container_session and container_session != self.session_id:
return {
"error": f"Cannot stop container '{name_or_id}' - it belongs to session '{container_session}', not this session '{self.session_id}'.",
"hint": "Each MCP session can only stop its own containers for safety.",
}
try:
# Stop the container
self._run_docker_cmd(["stop", name_or_id])
@ -807,25 +815,19 @@ class DockerMixin(MCPMixin):
except subprocess.CalledProcessError as e:
return {"error": f"Build failed: {e.stderr or e.stdout}"}
@mcp_tool(
name="docker_health",
description="Check if a GhydraMCP container's API is responding",
)
async def docker_health(
self, port: int = 8192, timeout: float = 5.0, ctx: Optional[Context] = None
) -> Dict[str, Any]:
"""Check if a GhydraMCP container's API is healthy.
def _sync_health_check(self, port: int, timeout: float) -> Dict[str, Any]:
"""Synchronous health check (runs in thread to avoid blocking event loop).
Args:
port: API port to check (default: 8192)
port: API port to check
timeout: Request timeout in seconds
Returns:
Health status and API info if available
Health status dict
"""
import urllib.request
import urllib.error
import json as json_module
import urllib.error
import urllib.request
url = f"http://localhost:{port}/"
@ -854,6 +856,27 @@ class DockerMixin(MCPMixin):
"error": str(e),
}
@mcp_tool(
name="docker_health",
description="Check if a GhydraMCP container's API is responding",
)
async def docker_health(
self, port: int = 8192, timeout: float = 5.0, ctx: Optional[Context] = None
) -> Dict[str, Any]:
"""Check if a GhydraMCP container's API is healthy.
Args:
port: API port to check (default: 8192)
timeout: Request timeout in seconds
Returns:
Health status and API info if available
"""
loop = asyncio.get_event_loop()
return await loop.run_in_executor(
None, self._sync_health_check, port, timeout
)
@mcp_tool(
name="docker_wait",
description="Wait for a GhydraMCP container to become healthy",
@ -902,57 +925,51 @@ class DockerMixin(MCPMixin):
async def docker_auto_start(
self,
binary_path: str,
port: Optional[int] = None,
wait: bool = True,
wait: bool = False,
timeout: float = 300.0,
ctx: Optional[Context] = None,
) -> Dict[str, Any]:
"""Automatically start a Docker container with intelligent port allocation.
This is the main entry point for automatic Docker management:
1. Checks if a Ghidra instance is already running (on specified or any pooled port)
1. Checks if a Ghidra instance with the SAME binary is already running
2. If not, allocates a port from the pool and starts a new container
3. Optionally waits for the container to become healthy
4. Returns connection info for the instance
When port is not specified, the system will:
- First check all pooled ports (8192-8199) for an existing healthy instance
- If none found, allocate a new port from the pool
Ports are auto-allocated from the pool (8192-8223) to prevent
conflicts between concurrent sessions.
Args:
binary_path: Path to the binary to analyze
port: Specific port for the HTTP API (auto-allocated if not specified)
wait: Wait for container to be ready (default: True)
wait: Wait for container to be ready (default: False, use docker_wait separately)
timeout: Max wait time in seconds (default: 300)
Returns:
Instance connection info with session ID and port details
"""
# If port is specified, check that specific port
if port is not None:
health = await self.docker_health(port=port, ctx=ctx)
if health.get("healthy"):
import os
requested_name = os.path.basename(binary_path)
def _is_same_binary(health_program: str) -> bool:
"""Check if a running instance has the same binary loaded."""
if not health_program:
return False
return os.path.basename(health_program) == requested_name
# Check all pooled ports for an instance with the SAME binary
for check_port in range(PORT_POOL_START, PORT_POOL_END + 1):
health = await self.docker_health(port=check_port, timeout=1.0, ctx=ctx)
if health.get("healthy") and _is_same_binary(health.get("program", "")):
return {
"source": "existing",
"session_id": self.session_id,
"port": port,
"api_url": f"http://localhost:{port}/",
"port": check_port,
"api_url": f"http://localhost:{check_port}/",
"program": health.get("program"),
"message": "Using existing Ghidra instance",
"message": f"Found existing Ghidra instance on port {check_port}",
}
else:
# Check all pooled ports for an existing instance
for check_port in range(PORT_POOL_START, PORT_POOL_END + 1):
health = await self.docker_health(port=check_port, timeout=1.0, ctx=ctx)
if health.get("healthy"):
return {
"source": "existing",
"session_id": self.session_id,
"port": check_port,
"api_url": f"http://localhost:{check_port}/",
"program": health.get("program"),
"message": f"Found existing Ghidra instance on port {check_port}",
}
# Check if Docker is available
status = await self.docker_status(ctx=ctx)
@ -970,9 +987,9 @@ class DockerMixin(MCPMixin):
)
}
# Start a new container (port will be auto-allocated if not specified)
# Start a new container (port auto-allocated from pool)
start_result = await self.docker_start(
binary_path=binary_path, port=port, ctx=ctx
binary_path=binary_path, ctx=ctx
)
if not start_result.get("success"):
@ -1021,7 +1038,7 @@ class DockerMixin(MCPMixin):
)
async def docker_cleanup(
self,
session_only: bool = False,
session_only: bool = True,
max_age_hours: float = 24.0,
dry_run: bool = False,
ctx: Optional[Context] = None,
@ -1031,8 +1048,12 @@ class DockerMixin(MCPMixin):
This helps recover from crashed processes that left containers or
port locks behind.
By default, only cleans containers from the current session to prevent
accidentally removing another agent's work. Set session_only=False
(with caution) to clean all GhydraMCP containers.
Args:
session_only: Only clean up containers from this session
session_only: Only clean up containers from this session (default: True for safety)
max_age_hours: Max age for orphaned containers (default: 24 hours)
dry_run: If True, only report what would be cleaned up

View File

@ -7,10 +7,10 @@ from typing import Any, Dict, List, Optional
from urllib.parse import quote
from fastmcp import Context
from fastmcp.contrib.mcp_mixin import mcp_tool, mcp_resource
from fastmcp.contrib.mcp_mixin import mcp_resource, mcp_tool
from .base import GhydraMixinBase
from ..config import get_config
from .base import GhydraMixinBase
class FunctionsMixin(GhydraMixinBase):
@ -28,6 +28,9 @@ class FunctionsMixin(GhydraMixinBase):
@mcp_tool()
def functions_list(
self,
name_contains: Optional[str] = None,
name_regex: Optional[str] = None,
address: Optional[str] = None,
port: Optional[int] = None,
page_size: int = 50,
grep: Optional[str] = None,
@ -36,12 +39,15 @@ class FunctionsMixin(GhydraMixinBase):
fields: Optional[List[str]] = None,
ctx: Optional[Context] = None,
) -> Dict[str, Any]:
"""List functions with cursor-based pagination.
"""List functions with cursor-based pagination and server-side filtering.
Args:
name_contains: Server-side substring filter on function name (faster than grep for large binaries)
name_regex: Server-side regex filter on function name
address: Filter by exact function address (hex)
port: Ghidra instance port (optional)
page_size: Functions per page (default: 50, max: 500)
grep: Regex pattern to filter function names
grep: Client-side regex pattern to filter function names
grep_ignorecase: Case-insensitive grep (default: True)
return_all: Return all functions without pagination
fields: Field names to keep (e.g. ['name', 'address']). Reduces response size.
@ -56,7 +62,15 @@ class FunctionsMixin(GhydraMixinBase):
return {"success": False, "error": {"code": "NO_INSTANCE", "message": str(e)}}
config = get_config()
response = self.safe_get(port, "functions", {"limit": 10000})
params = {"limit": 10000}
if name_contains:
params["name_contains"] = name_contains
if name_regex:
params["name_matches_regex"] = name_regex
if address:
params["addr"] = address
response = self.safe_get(port, "functions", params)
simplified = self.simplify_response(response)
if not simplified.get("success", True):
@ -66,7 +80,14 @@ class FunctionsMixin(GhydraMixinBase):
if not isinstance(functions, list):
functions = []
query_params = {"tool": "functions_list", "port": port, "grep": grep}
query_params = {
"tool": "functions_list",
"port": port,
"name_contains": name_contains,
"name_regex": name_regex,
"address": address,
"grep": grep,
}
session_id = self._get_session_id(ctx)
return self.filtered_paginate(
@ -467,7 +488,7 @@ class FunctionsMixin(GhydraMixinBase):
"functions": functions[:cap],
"count": len(functions),
"capped_at": cap if len(functions) >= cap else None,
"_hint": f"Use functions_list() tool for full pagination" if len(functions) >= cap else None,
"_hint": "Use functions_list() tool for full pagination" if len(functions) >= cap else None,
}
@mcp_resource(uri="ghidra://instance/{port}/function/decompile/address/{address}")

View File

@ -4,13 +4,12 @@ Provides tools for discovering, registering, and managing Ghidra instances.
"""
import time
from typing import Any, Dict, List, Optional
from typing import Any, Dict, Optional
from fastmcp.contrib.mcp_mixin import mcp_tool, mcp_resource
from fastmcp.contrib.mcp_mixin import mcp_resource, mcp_tool
from .base import GhydraMixinBase
from ..config import get_config
from ..core.http_client import safe_get
from .base import GhydraMixinBase
class InstancesMixin(GhydraMixinBase):
@ -167,13 +166,20 @@ class InstancesMixin(GhydraMixinBase):
Returns:
Confirmation message with instance details
"""
# Register lazily without blocking HTTP calls.
# If the instance is unknown, create a stub entry — the first
# actual tool call (functions_list, etc.) will validate the
# connection and fail fast with a clear error if unreachable.
with self._instances_lock:
needs_register = port not in self._instances
if needs_register:
result = self.register_instance(port)
if "Failed" in result or "Error" in result:
return result
if port not in self._instances:
config = get_config()
self._instances[port] = {
"url": f"http://{config.ghidra_host}:{port}",
"project": "",
"file": "",
"registered_at": time.time(),
"lazy": True,
}
self.set_current_port(port)
@ -211,6 +217,25 @@ class InstancesMixin(GhydraMixinBase):
return {"port": port, "status": "registered but no details available"}
@mcp_tool()
def program_info(self, port: Optional[int] = None) -> Dict[str, Any]:
"""Get full program metadata (architecture, language, compiler, image base, memory size).
Args:
port: Ghidra instance port (optional)
Returns:
Program metadata including architecture, language, compiler spec,
image base address, and total memory size
"""
try:
port = self.get_instance_port(port)
except ValueError as e:
return {"success": False, "error": {"code": "NO_INSTANCE", "message": str(e)}}
response = self.safe_get(port, "program")
return self.simplify_response(response)
@mcp_resource(uri="ghidra://instances")
def resource_instances_list(self) -> Dict[str, Any]:
"""MCP Resource: List all active Ghidra instances.
@ -297,3 +322,21 @@ class InstancesMixin(GhydraMixinBase):
"string_count": string_count,
"port": port,
}
@mcp_resource(uri="ghidra://instance/{port}/program")
def resource_program_info(self, port: Optional[int] = None) -> Dict[str, Any]:
"""MCP Resource: Get program metadata for a Ghidra instance.
Args:
port: Ghidra instance port
Returns:
Program metadata (architecture, language, compiler, image base)
"""
try:
port = self.get_instance_port(port)
except ValueError as e:
return {"error": str(e)}
response = self.safe_get(port, "program")
return self.simplify_response(response)

View File

@ -0,0 +1,211 @@
"""Namespaces mixin for GhydraMCP.
Provides tools for querying namespaces and class definitions.
"""
from typing import Any, Dict, List, Optional
from fastmcp import Context
from fastmcp.contrib.mcp_mixin import mcp_resource, mcp_tool
from ..config import get_config
from .base import GhydraMixinBase
class NamespacesMixin(GhydraMixinBase):
"""Mixin for namespace and class operations.
Provides tools for:
- Listing all non-global namespaces
- Listing class namespaces with qualified names
"""
@mcp_tool()
def namespaces_list(
self,
port: Optional[int] = None,
page_size: int = 50,
grep: Optional[str] = None,
grep_ignorecase: bool = True,
return_all: bool = False,
fields: Optional[List[str]] = None,
ctx: Optional[Context] = None,
) -> Dict[str, Any]:
"""List all non-global namespaces with pagination.
Args:
port: Ghidra instance port (optional)
page_size: Namespaces per page (default: 50, max: 500)
grep: Regex pattern to filter namespace names
grep_ignorecase: Case-insensitive grep (default: True)
return_all: Return all namespaces without pagination
fields: Field names to keep (e.g. ['name', 'id']). Reduces response size.
ctx: FastMCP context (auto-injected)
Returns:
Paginated list of namespaces
"""
try:
port = self.get_instance_port(port)
except ValueError as e:
return {"success": False, "error": {"code": "NO_INSTANCE", "message": str(e)}}
config = get_config()
cap = config.resource_caps.get("namespaces", 500)
response = self.safe_get(port, "namespaces", {"limit": cap})
simplified = self.simplify_response(response)
if not simplified.get("success", True):
return simplified
namespaces = simplified.get("result", [])
if not isinstance(namespaces, list):
namespaces = []
query_params = {"tool": "namespaces_list", "port": port, "grep": grep}
session_id = self._get_session_id(ctx)
return self.filtered_paginate(
data=namespaces,
query_params=query_params,
tool_name="namespaces_list",
session_id=session_id,
page_size=min(page_size, config.max_page_size),
grep=grep,
grep_ignorecase=grep_ignorecase,
return_all=return_all,
fields=fields,
)
@mcp_tool()
def classes_list(
self,
port: Optional[int] = None,
page_size: int = 50,
grep: Optional[str] = None,
grep_ignorecase: bool = True,
return_all: bool = False,
fields: Optional[List[str]] = None,
ctx: Optional[Context] = None,
) -> Dict[str, Any]:
"""List class namespaces with qualified names.
Args:
port: Ghidra instance port (optional)
page_size: Classes per page (default: 50, max: 500)
grep: Regex pattern to filter class names
grep_ignorecase: Case-insensitive grep (default: True)
return_all: Return all classes without pagination
fields: Field names to keep (e.g. ['name', 'qualified_name']). Reduces response size.
ctx: FastMCP context (auto-injected)
Returns:
Paginated list of class namespaces
"""
try:
port = self.get_instance_port(port)
except ValueError as e:
return {"success": False, "error": {"code": "NO_INSTANCE", "message": str(e)}}
config = get_config()
cap = config.resource_caps.get("classes", 500)
response = self.safe_get(port, "classes", {"limit": cap})
simplified = self.simplify_response(response)
if not simplified.get("success", True):
return simplified
classes = simplified.get("result", [])
if not isinstance(classes, list):
classes = []
query_params = {"tool": "classes_list", "port": port, "grep": grep}
session_id = self._get_session_id(ctx)
return self.filtered_paginate(
data=classes,
query_params=query_params,
tool_name="classes_list",
session_id=session_id,
page_size=min(page_size, config.max_page_size),
grep=grep,
grep_ignorecase=grep_ignorecase,
return_all=return_all,
fields=fields,
)
# Resources
@mcp_resource(uri="ghidra://instance/{port}/namespaces")
def resource_namespaces_list(self, port: Optional[int] = None) -> Dict[str, Any]:
"""MCP Resource: List namespaces (capped).
Args:
port: Ghidra instance port
Returns:
List of namespaces (capped)
"""
try:
port = self.get_instance_port(port)
except ValueError as e:
return {"error": str(e)}
config = get_config()
cap = config.resource_caps.get("namespaces", 500)
response = self.safe_get(port, "namespaces", {"limit": cap})
simplified = self.simplify_response(response)
if not simplified.get("success", True):
return simplified
namespaces = simplified.get("result", [])
if not isinstance(namespaces, list):
namespaces = []
return {
"namespaces": namespaces[:cap],
"count": len(namespaces),
"capped_at": cap if len(namespaces) >= cap else None,
"_hint": "Use namespaces_list() tool for full pagination"
if len(namespaces) >= cap
else None,
}
@mcp_resource(uri="ghidra://instance/{port}/classes")
def resource_classes_list(self, port: Optional[int] = None) -> Dict[str, Any]:
"""MCP Resource: List classes (capped).
Args:
port: Ghidra instance port
Returns:
List of class namespaces (capped)
"""
try:
port = self.get_instance_port(port)
except ValueError as e:
return {"error": str(e)}
config = get_config()
cap = config.resource_caps.get("classes", 500)
response = self.safe_get(port, "classes", {"limit": cap})
simplified = self.simplify_response(response)
if not simplified.get("success", True):
return simplified
classes = simplified.get("result", [])
if not isinstance(classes, list):
classes = []
return {
"classes": classes[:cap],
"count": len(classes),
"capped_at": cap if len(classes) >= cap else None,
"_hint": "Use classes_list() tool for full pagination"
if len(classes) >= cap
else None,
}

View File

@ -0,0 +1,122 @@
"""Segments mixin for GhydraMCP.
Provides tools for querying memory segments (sections) and their permissions.
"""
from typing import Any, Dict, List, Optional
from fastmcp import Context
from fastmcp.contrib.mcp_mixin import mcp_resource, mcp_tool
from ..config import get_config
from .base import GhydraMixinBase
class SegmentsMixin(GhydraMixinBase):
"""Mixin for memory segment operations.
Provides tools for:
- Listing memory segments with permissions and size info
"""
@mcp_tool()
def segments_list(
self,
name: Optional[str] = None,
port: Optional[int] = None,
page_size: int = 50,
grep: Optional[str] = None,
grep_ignorecase: bool = True,
return_all: bool = False,
fields: Optional[List[str]] = None,
ctx: Optional[Context] = None,
) -> Dict[str, Any]:
"""List memory segments with R/W/X permissions and size info.
Args:
name: Filter by segment name (server-side, exact match)
port: Ghidra instance port (optional)
page_size: Segments per page (default: 50, max: 500)
grep: Regex pattern to filter segment names
grep_ignorecase: Case-insensitive grep (default: True)
return_all: Return all segments without pagination
fields: Field names to keep (e.g. ['name', 'start', 'permissions']). Reduces response size.
ctx: FastMCP context (auto-injected)
Returns:
Paginated list of memory segments
"""
try:
port = self.get_instance_port(port)
except ValueError as e:
return {"success": False, "error": {"code": "NO_INSTANCE", "message": str(e)}}
config = get_config()
cap = config.resource_caps.get("segments", 500)
params = {"limit": cap}
if name:
params["name"] = name
response = self.safe_get(port, "segments", params)
simplified = self.simplify_response(response)
if not simplified.get("success", True):
return simplified
segments = simplified.get("result", [])
if not isinstance(segments, list):
segments = []
query_params = {"tool": "segments_list", "port": port, "name": name, "grep": grep}
session_id = self._get_session_id(ctx)
return self.filtered_paginate(
data=segments,
query_params=query_params,
tool_name="segments_list",
session_id=session_id,
page_size=min(page_size, config.max_page_size),
grep=grep,
grep_ignorecase=grep_ignorecase,
return_all=return_all,
fields=fields,
)
# Resources
@mcp_resource(uri="ghidra://instance/{port}/segments")
def resource_segments_list(self, port: Optional[int] = None) -> Dict[str, Any]:
"""MCP Resource: List memory segments (capped).
Args:
port: Ghidra instance port
Returns:
List of memory segments (capped)
"""
try:
port = self.get_instance_port(port)
except ValueError as e:
return {"error": str(e)}
config = get_config()
cap = config.resource_caps.get("segments", 500)
response = self.safe_get(port, "segments", {"limit": cap})
simplified = self.simplify_response(response)
if not simplified.get("success", True):
return simplified
segments = simplified.get("result", [])
if not isinstance(segments, list):
segments = []
return {
"segments": segments[:cap],
"count": len(segments),
"capped_at": cap if len(segments) >= cap else None,
"_hint": "Use segments_list() tool for full pagination"
if len(segments) >= cap
else None,
}

View File

@ -6,10 +6,10 @@ Provides tools for struct data type operations.
from typing import Any, Dict, List, Optional
from fastmcp import Context
from fastmcp.contrib.mcp_mixin import mcp_tool, mcp_resource
from fastmcp.contrib.mcp_mixin import mcp_resource, mcp_tool
from .base import GhydraMixinBase
from ..config import get_config
from .base import GhydraMixinBase
class StructsMixin(GhydraMixinBase):

View File

@ -0,0 +1,422 @@
"""Symbols mixin for GhydraMCP.
Provides tools for symbol table operations including labels, imports, and exports.
"""
from typing import Any, Dict, List, Optional
from fastmcp import Context
from fastmcp.contrib.mcp_mixin import mcp_resource, mcp_tool
from ..config import get_config
from .base import GhydraMixinBase
class SymbolsMixin(GhydraMixinBase):
"""Mixin for symbol table operations.
Provides tools for:
- Listing all symbols with pagination
- Querying imported symbols (external references)
- Querying exported symbols (entry points)
"""
@mcp_tool()
def symbols_list(
self,
port: Optional[int] = None,
page_size: int = 50,
grep: Optional[str] = None,
grep_ignorecase: bool = True,
return_all: bool = False,
fields: Optional[List[str]] = None,
ctx: Optional[Context] = None,
) -> Dict[str, Any]:
"""List symbols with cursor-based pagination.
Args:
port: Ghidra instance port (optional)
page_size: Symbols per page (default: 50, max: 500)
grep: Regex pattern to filter symbol names
grep_ignorecase: Case-insensitive grep (default: True)
return_all: Return all symbols without pagination
fields: Field names to keep (e.g. ['name', 'address']). Reduces response size.
ctx: FastMCP context (auto-injected)
Returns:
Paginated list of symbols
"""
try:
port = self.get_instance_port(port)
except ValueError as e:
return {"success": False, "error": {"code": "NO_INSTANCE", "message": str(e)}}
config = get_config()
cap = config.resource_caps.get("symbols", 1000)
response = self.safe_get(port, "symbols", {"limit": cap})
simplified = self.simplify_response(response)
if not simplified.get("success", True):
return simplified
symbols = simplified.get("result", [])
if not isinstance(symbols, list):
symbols = []
query_params = {"tool": "symbols_list", "port": port, "grep": grep}
session_id = self._get_session_id(ctx)
return self.filtered_paginate(
data=symbols,
query_params=query_params,
tool_name="symbols_list",
session_id=session_id,
page_size=min(page_size, config.max_page_size),
grep=grep,
grep_ignorecase=grep_ignorecase,
return_all=return_all,
fields=fields,
)
@mcp_tool()
def symbols_imports(
self,
port: Optional[int] = None,
page_size: int = 50,
grep: Optional[str] = None,
grep_ignorecase: bool = True,
return_all: bool = False,
fields: Optional[List[str]] = None,
ctx: Optional[Context] = None,
) -> Dict[str, Any]:
"""List imported symbols (external references) with pagination.
Args:
port: Ghidra instance port (optional)
page_size: Imports per page (default: 50, max: 500)
grep: Regex pattern to filter import names
grep_ignorecase: Case-insensitive grep (default: True)
return_all: Return all imports without pagination
fields: Field names to keep. Reduces response size.
ctx: FastMCP context (auto-injected)
Returns:
Paginated list of imported symbols
"""
try:
port = self.get_instance_port(port)
except ValueError as e:
return {"success": False, "error": {"code": "NO_INSTANCE", "message": str(e)}}
config = get_config()
cap = config.resource_caps.get("symbols", 1000)
response = self.safe_get(port, "symbols/imports", {"limit": cap})
simplified = self.simplify_response(response)
if not simplified.get("success", True):
return simplified
imports = simplified.get("result", [])
if not isinstance(imports, list):
imports = []
query_params = {"tool": "symbols_imports", "port": port, "grep": grep}
session_id = self._get_session_id(ctx)
return self.filtered_paginate(
data=imports,
query_params=query_params,
tool_name="symbols_imports",
session_id=session_id,
page_size=min(page_size, config.max_page_size),
grep=grep,
grep_ignorecase=grep_ignorecase,
return_all=return_all,
fields=fields,
)
@mcp_tool()
def symbols_exports(
self,
port: Optional[int] = None,
page_size: int = 50,
grep: Optional[str] = None,
grep_ignorecase: bool = True,
return_all: bool = False,
fields: Optional[List[str]] = None,
ctx: Optional[Context] = None,
) -> Dict[str, Any]:
"""List exported symbols (entry points) with pagination.
Args:
port: Ghidra instance port (optional)
page_size: Exports per page (default: 50, max: 500)
grep: Regex pattern to filter export names
grep_ignorecase: Case-insensitive grep (default: True)
return_all: Return all exports without pagination
fields: Field names to keep. Reduces response size.
ctx: FastMCP context (auto-injected)
Returns:
Paginated list of exported symbols
"""
try:
port = self.get_instance_port(port)
except ValueError as e:
return {"success": False, "error": {"code": "NO_INSTANCE", "message": str(e)}}
config = get_config()
cap = config.resource_caps.get("symbols", 1000)
response = self.safe_get(port, "symbols/exports", {"limit": cap})
simplified = self.simplify_response(response)
if not simplified.get("success", True):
return simplified
exports = simplified.get("result", [])
if not isinstance(exports, list):
exports = []
query_params = {"tool": "symbols_exports", "port": port, "grep": grep}
session_id = self._get_session_id(ctx)
return self.filtered_paginate(
data=exports,
query_params=query_params,
tool_name="symbols_exports",
session_id=session_id,
page_size=min(page_size, config.max_page_size),
grep=grep,
grep_ignorecase=grep_ignorecase,
return_all=return_all,
fields=fields,
)
@mcp_tool()
def symbols_create(
self,
name: str,
address: str,
port: Optional[int] = None,
) -> Dict[str, Any]:
"""Create a new label/symbol at the specified address.
Args:
name: Name for the new symbol
address: Memory address in hex format
port: Ghidra instance port (optional)
Returns:
Created symbol information
"""
if not name:
return {
"success": False,
"error": {
"code": "MISSING_PARAMETER",
"message": "name parameter is required",
},
}
if not address:
return {
"success": False,
"error": {
"code": "MISSING_PARAMETER",
"message": "address parameter is required",
},
}
try:
port = self.get_instance_port(port)
except ValueError as e:
return {"success": False, "error": {"code": "NO_INSTANCE", "message": str(e)}}
payload = {"name": name, "address": address}
response = self.safe_post(port, "symbols", payload)
return self.simplify_response(response)
@mcp_tool()
def symbols_rename(
self,
address: str,
new_name: str,
port: Optional[int] = None,
) -> Dict[str, Any]:
"""Rename the primary symbol at the specified address.
Args:
address: Memory address of the symbol in hex format
new_name: New name for the symbol
port: Ghidra instance port (optional)
Returns:
Operation result with old and new names
"""
if not address:
return {
"success": False,
"error": {
"code": "MISSING_PARAMETER",
"message": "address parameter is required",
},
}
if not new_name:
return {
"success": False,
"error": {
"code": "MISSING_PARAMETER",
"message": "new_name parameter is required",
},
}
try:
port = self.get_instance_port(port)
except ValueError as e:
return {"success": False, "error": {"code": "NO_INSTANCE", "message": str(e)}}
payload = {"name": new_name}
response = self.safe_patch(port, f"symbols/{address}", payload)
return self.simplify_response(response)
@mcp_tool()
def symbols_delete(
self,
address: str,
port: Optional[int] = None,
) -> Dict[str, Any]:
"""Delete the primary symbol at the specified address.
Args:
address: Memory address of the symbol in hex format
port: Ghidra instance port (optional)
Returns:
Operation result
"""
if not address:
return {
"success": False,
"error": {
"code": "MISSING_PARAMETER",
"message": "address parameter is required",
},
}
try:
port = self.get_instance_port(port)
except ValueError as e:
return {"success": False, "error": {"code": "NO_INSTANCE", "message": str(e)}}
response = self.safe_delete(port, f"symbols/{address}")
return self.simplify_response(response)
# Resources
@mcp_resource(uri="ghidra://instance/{port}/symbols")
def resource_symbols_list(self, port: Optional[int] = None) -> Dict[str, Any]:
"""MCP Resource: List symbols (capped).
Args:
port: Ghidra instance port
Returns:
List of symbols (capped)
"""
try:
port = self.get_instance_port(port)
except ValueError as e:
return {"error": str(e)}
config = get_config()
cap = config.resource_caps.get("symbols", 1000)
response = self.safe_get(port, "symbols", {"limit": cap})
simplified = self.simplify_response(response)
if not simplified.get("success", True):
return simplified
symbols = simplified.get("result", [])
if not isinstance(symbols, list):
symbols = []
return {
"symbols": symbols[:cap],
"count": len(symbols),
"capped_at": cap if len(symbols) >= cap else None,
"_hint": "Use symbols_list() tool for full pagination" if len(symbols) >= cap else None,
}
@mcp_resource(uri="ghidra://instance/{port}/symbols/imports")
def resource_symbols_imports(self, port: Optional[int] = None) -> Dict[str, Any]:
"""MCP Resource: List imported symbols (capped).
Args:
port: Ghidra instance port
Returns:
List of imported symbols (capped)
"""
try:
port = self.get_instance_port(port)
except ValueError as e:
return {"error": str(e)}
config = get_config()
cap = config.resource_caps.get("symbols", 1000)
response = self.safe_get(port, "symbols/imports", {"limit": cap})
simplified = self.simplify_response(response)
if not simplified.get("success", True):
return simplified
imports = simplified.get("result", [])
if not isinstance(imports, list):
imports = []
return {
"imports": imports[:cap],
"count": len(imports),
"capped_at": cap if len(imports) >= cap else None,
"_hint": "Use symbols_imports() tool for full pagination"
if len(imports) >= cap
else None,
}
@mcp_resource(uri="ghidra://instance/{port}/symbols/exports")
def resource_symbols_exports(self, port: Optional[int] = None) -> Dict[str, Any]:
"""MCP Resource: List exported symbols (capped).
Args:
port: Ghidra instance port
Returns:
List of exported symbols (capped)
"""
try:
port = self.get_instance_port(port)
except ValueError as e:
return {"error": str(e)}
config = get_config()
cap = config.resource_caps.get("symbols", 1000)
response = self.safe_get(port, "symbols/exports", {"limit": cap})
simplified = self.simplify_response(response)
if not simplified.get("success", True):
return simplified
exports = simplified.get("result", [])
if not isinstance(exports, list):
exports = []
return {
"exports": exports[:cap],
"count": len(exports),
"capped_at": cap if len(exports) >= cap else None,
"_hint": "Use symbols_exports() tool for full pagination"
if len(exports) >= cap
else None,
}

View File

@ -0,0 +1,261 @@
"""Variables mixin for GhydraMCP.
Provides tools for querying global and function-local variables.
"""
from typing import Any, Dict, List, Optional
from fastmcp import Context
from fastmcp.contrib.mcp_mixin import mcp_resource, mcp_tool
from ..config import get_config
from .base import GhydraMixinBase
class VariablesMixin(GhydraMixinBase):
"""Mixin for variable operations.
Provides tools for:
- Listing global and function variables
- Querying local variables and parameters for a specific function
"""
@mcp_tool()
def variables_list(
self,
global_only: bool = False,
port: Optional[int] = None,
page_size: int = 50,
grep: Optional[str] = None,
grep_ignorecase: bool = True,
return_all: bool = False,
fields: Optional[List[str]] = None,
ctx: Optional[Context] = None,
) -> Dict[str, Any]:
"""List variables with cursor-based pagination.
Args:
global_only: Only return global variables (default: False)
port: Ghidra instance port (optional)
page_size: Variables per page (default: 50, max: 500)
grep: Regex pattern to filter variable names
grep_ignorecase: Case-insensitive grep (default: True)
return_all: Return all variables without pagination
fields: Field names to keep (e.g. ['name', 'type', 'address']). Reduces response size.
ctx: FastMCP context (auto-injected)
Returns:
Paginated list of variables
"""
try:
port = self.get_instance_port(port)
except ValueError as e:
return {"success": False, "error": {"code": "NO_INSTANCE", "message": str(e)}}
config = get_config()
cap = config.resource_caps.get("variables", 1000)
params = {"limit": cap}
if global_only:
params["global_only"] = "true"
response = self.safe_get(port, "variables", params)
simplified = self.simplify_response(response)
if not simplified.get("success", True):
return simplified
variables = simplified.get("result", [])
if not isinstance(variables, list):
variables = []
query_params = {
"tool": "variables_list",
"port": port,
"global_only": global_only,
"grep": grep,
}
session_id = self._get_session_id(ctx)
return self.filtered_paginate(
data=variables,
query_params=query_params,
tool_name="variables_list",
session_id=session_id,
page_size=min(page_size, config.max_page_size),
grep=grep,
grep_ignorecase=grep_ignorecase,
return_all=return_all,
fields=fields,
)
@mcp_tool()
def functions_variables(
self,
address: str,
port: Optional[int] = None,
page_size: int = 50,
grep: Optional[str] = None,
grep_ignorecase: bool = True,
return_all: bool = False,
fields: Optional[List[str]] = None,
ctx: Optional[Context] = None,
) -> Dict[str, Any]:
"""List local variables and parameters for a specific function.
Args:
address: Function address in hex format
port: Ghidra instance port (optional)
page_size: Variables per page (default: 50, max: 500)
grep: Regex pattern to filter variable names
grep_ignorecase: Case-insensitive grep (default: True)
return_all: Return all variables without pagination
fields: Field names to keep (e.g. ['name', 'type', 'storage']). Reduces response size.
ctx: FastMCP context (auto-injected)
Returns:
Paginated list of function variables
"""
if not address:
return {
"success": False,
"error": {
"code": "MISSING_PARAMETER",
"message": "Address parameter is required",
},
}
try:
port = self.get_instance_port(port)
except ValueError as e:
return {"success": False, "error": {"code": "NO_INSTANCE", "message": str(e)}}
config = get_config()
response = self.safe_get(port, f"functions/{address}/variables")
simplified = self.simplify_response(response)
if not simplified.get("success", True):
return simplified
variables = simplified.get("result", [])
if not isinstance(variables, list):
variables = []
query_params = {
"tool": "functions_variables",
"port": port,
"address": address,
"grep": grep,
}
session_id = self._get_session_id(ctx)
return self.filtered_paginate(
data=variables,
query_params=query_params,
tool_name="functions_variables",
session_id=session_id,
page_size=min(page_size, config.max_page_size),
grep=grep,
grep_ignorecase=grep_ignorecase,
return_all=return_all,
fields=fields,
)
@mcp_tool()
def variables_rename(
self,
function_address: str,
variable_name: str,
new_name: str,
new_type: Optional[str] = None,
port: Optional[int] = None,
) -> Dict[str, Any]:
"""Rename a variable (and optionally retype) within a function.
Args:
function_address: Function address in hex format
variable_name: Current name of the variable
new_name: New name for the variable
new_type: New data type (optional, e.g. "int", "char*")
port: Ghidra instance port (optional)
Returns:
Operation result
"""
if not function_address:
return {
"success": False,
"error": {
"code": "MISSING_PARAMETER",
"message": "function_address parameter is required",
},
}
if not variable_name:
return {
"success": False,
"error": {
"code": "MISSING_PARAMETER",
"message": "variable_name parameter is required",
},
}
if not new_name:
return {
"success": False,
"error": {
"code": "MISSING_PARAMETER",
"message": "new_name parameter is required",
},
}
try:
port = self.get_instance_port(port)
except ValueError as e:
return {"success": False, "error": {"code": "NO_INSTANCE", "message": str(e)}}
from urllib.parse import quote
payload: dict = {"name": new_name}
if new_type:
payload["data_type"] = new_type
endpoint = f"functions/{function_address}/variables/{quote(variable_name)}"
response = self.safe_patch(port, endpoint, payload)
return self.simplify_response(response)
# Resources
@mcp_resource(uri="ghidra://instance/{port}/variables")
def resource_variables_list(self, port: Optional[int] = None) -> Dict[str, Any]:
"""MCP Resource: List variables (capped).
Args:
port: Ghidra instance port
Returns:
List of variables (capped)
"""
try:
port = self.get_instance_port(port)
except ValueError as e:
return {"error": str(e)}
config = get_config()
cap = config.resource_caps.get("variables", 1000)
response = self.safe_get(port, "variables", {"limit": cap})
simplified = self.simplify_response(response)
if not simplified.get("success", True):
return simplified
variables = simplified.get("result", [])
if not isinstance(variables, list):
variables = []
return {
"variables": variables[:cap],
"count": len(variables),
"capped_at": cap if len(variables) >= cap else None,
"_hint": "Use variables_list() tool for full pagination"
if len(variables) >= cap
else None,
}

View File

@ -6,10 +6,10 @@ Provides tools for cross-reference (xref) operations.
from typing import Any, Dict, List, Optional
from fastmcp import Context
from fastmcp.contrib.mcp_mixin import mcp_tool, mcp_resource
from fastmcp.contrib.mcp_mixin import mcp_resource, mcp_tool
from .base import GhydraMixinBase
from ..config import get_config
from .base import GhydraMixinBase
class XrefsMixin(GhydraMixinBase):

View File

@ -13,17 +13,23 @@ from typing import Optional
from fastmcp import FastMCP
from .config import get_config, set_config, GhydraConfig
from .config import GhydraConfig, get_config, set_config
from .mixins import (
InstancesMixin,
FunctionsMixin,
DataMixin,
StructsMixin,
AnalysisMixin,
MemoryMixin,
XrefsMixin,
BookmarksMixin,
CursorsMixin,
DataMixin,
DataTypesMixin,
DockerMixin,
FunctionsMixin,
InstancesMixin,
MemoryMixin,
NamespacesMixin,
SegmentsMixin,
StructsMixin,
SymbolsMixin,
VariablesMixin,
XrefsMixin,
)
@ -56,6 +62,12 @@ def create_server(
xrefs_mixin = XrefsMixin()
cursors_mixin = CursorsMixin()
docker_mixin = DockerMixin()
symbols_mixin = SymbolsMixin()
segments_mixin = SegmentsMixin()
variables_mixin = VariablesMixin()
namespaces_mixin = NamespacesMixin()
bookmarks_mixin = BookmarksMixin()
datatypes_mixin = DataTypesMixin()
# Register all mixins with the server
# Each mixin registers its tools, resources, and prompts
@ -68,6 +80,12 @@ def create_server(
xrefs_mixin.register_all(mcp)
cursors_mixin.register_all(mcp)
docker_mixin.register_all(mcp)
symbols_mixin.register_all(mcp)
segments_mixin.register_all(mcp)
variables_mixin.register_all(mcp)
namespaces_mixin.register_all(mcp)
bookmarks_mixin.register_all(mcp)
datatypes_mixin.register_all(mcp)
# Optional feedback collection
cfg = get_config()
@ -87,30 +105,41 @@ def create_server(
def _periodic_discovery(interval: int = 30):
"""Background thread for periodic instance discovery.
Uses a short timeout per port so a full scan completes quickly
even when most ports are unreachable.
Args:
interval: Seconds between discovery attempts
"""
import requests as _requests
from .mixins.base import GhydraMixinBase
from .core.http_client import safe_get
config = get_config()
while True:
time.sleep(interval)
try:
# Quick scan of common ports
# Quick scan — use discovery_timeout (0.5s), NOT request_timeout (30s)
for port in config.quick_discovery_range:
try:
response = safe_get(port, "")
if response.get("success", False):
with GhydraMixinBase._instances_lock:
if port not in GhydraMixinBase._instances:
GhydraMixinBase._instances[port] = {
"url": f"http://{config.ghidra_host}:{port}",
"project": response.get("project", ""),
"file": response.get("file", ""),
"discovered_at": time.time(),
}
url = f"http://{config.ghidra_host}:{port}/"
resp = _requests.get(
url,
timeout=config.discovery_timeout,
headers={"Accept": "application/json"},
)
if resp.ok:
response = resp.json()
if response.get("success", False):
with GhydraMixinBase._instances_lock:
if port not in GhydraMixinBase._instances:
GhydraMixinBase._instances[port] = {
"url": url.rstrip("/"),
"project": response.get("project", ""),
"file": response.get("file", ""),
"discovered_at": time.time(),
}
except Exception:
pass
except Exception:
@ -154,8 +183,8 @@ def main():
# Initial instance discovery
print(f" Discovering Ghidra instances on {config.ghidra_host}...", file=sys.stderr)
from .mixins.base import GhydraMixinBase
from .core.http_client import safe_get
from .mixins.base import GhydraMixinBase
found = 0
for port in config.quick_discovery_range: