Compare commits

..

No commits in common. "458d4fb35bdde29fdaeb8ec772c3edae2be1b64f" and "1b42ab251efb30066024330563ba9c61a1ff4f1d" have entirely different histories.

25 changed files with 88 additions and 2247 deletions

View File

@ -305,7 +305,6 @@ ROUTES = [
("GET", r"^/strings$", "handle_strings"), ("GET", r"^/strings$", "handle_strings"),
# Memory # Memory
("GET", r"^/memory/([^/]+)/comments/([^/]+)$", "handle_get_comment"),
("POST", r"^/memory/([^/]+)/comments/([^/]+)$", "handle_set_comment"), ("POST", r"^/memory/([^/]+)/comments/([^/]+)$", "handle_set_comment"),
("GET", r"^/memory/blocks$", "handle_memory_blocks"), ("GET", r"^/memory/blocks$", "handle_memory_blocks"),
("GET", r"^/memory$", "handle_memory_read"), ("GET", r"^/memory$", "handle_memory_read"),
@ -317,26 +316,8 @@ ROUTES = [
# Symbols # Symbols
("GET", r"^/symbols/imports$", "handle_imports"), ("GET", r"^/symbols/imports$", "handle_imports"),
("GET", r"^/symbols/exports$", "handle_exports"), ("GET", r"^/symbols/exports$", "handle_exports"),
("POST", r"^/symbols$", "handle_symbol_create"),
("PATCH", r"^/symbols/([^/]+)$", "handle_symbol_rename"),
("DELETE", r"^/symbols/([^/]+)$", "handle_symbol_delete"),
("GET", r"^/symbols$", "handle_symbols"), ("GET", r"^/symbols$", "handle_symbols"),
# Variables
("PATCH", r"^/functions/([^/]+)/variables/([^/]+)$", "handle_variable_rename"),
("GET", r"^/variables$", "handle_variables"),
# Bookmarks
("POST", r"^/bookmarks$", "handle_bookmark_create"),
("DELETE", r"^/bookmarks/([^/]+)$", "handle_bookmark_delete"),
("GET", r"^/bookmarks$", "handle_bookmarks"),
# Data types
("POST", r"^/datatypes/enums$", "handle_enum_create"),
("GET", r"^/datatypes/enums$", "handle_enums"),
("POST", r"^/datatypes/typedefs$", "handle_typedef_create"),
("GET", r"^/datatypes/typedefs$", "handle_typedefs"),
# Cross-references # Cross-references
("GET", r"^/xrefs$", "handle_xrefs"), ("GET", r"^/xrefs$", "handle_xrefs"),
@ -753,62 +734,26 @@ class GhydraMCPHandler(HttpHandler):
limit = parse_int(params.get("limit"), 100) limit = parse_int(params.get("limit"), 100)
offset = parse_int(params.get("offset"), 0) offset = parse_int(params.get("offset"), 0)
name_filter = params.get("name") name_filter = params.get("name")
name_contains = params.get("name_contains")
name_regex = params.get("name_matches_regex")
addr_filter = params.get("addr")
grep_pattern = compile_grep(params) grep_pattern = compile_grep(params)
# Compile name regex if provided functions = []
name_regex_pat = None
if name_regex:
try:
name_regex_pat = re.compile(name_regex, re.IGNORECASE)
except:
return {"success": False, "error": {"code": "INVALID_REGEX", "message": "Invalid regex: %s" % name_regex}}
fm = self.program.getFunctionManager() fm = self.program.getFunctionManager()
total = fm.getFunctionCount() total = fm.getFunctionCount()
# Single function lookup by address
if addr_filter:
func = self._find_function_at(addr_filter)
if not func:
return {"success": True, "result": [], "size": 0, "offset": 0, "limit": limit}
addr = str(func.getEntryPoint())
return {"success": True, "result": [{
"name": func.getName(),
"address": addr,
"signature": str(func.getSignature()),
"parameterCount": func.getParameterCount(),
"isThunk": func.isThunk(),
"_links": {
"self": make_link("/functions/%s" % addr),
"decompile": make_link("/functions/%s/decompile" % addr),
"disassembly": make_link("/functions/%s/disassembly" % addr),
},
}], "size": 1, "offset": 0, "limit": limit}
functions = []
count = 0 count = 0
skipped = 0 skipped = 0
for func in fm.getFunctions(True): for func in fm.getFunctions(True):
if count >= limit: if count >= limit:
break break
func_name = func.getName() # Apply name filter
# Apply name filters if name_filter and name_filter.lower() not in func.getName().lower():
if name_filter and name_filter.lower() not in func_name.lower():
continue
if name_contains and name_contains.lower() not in func_name.lower():
continue
if name_regex_pat and not name_regex_pat.search(func_name):
continue continue
if skipped < offset: if skipped < offset:
skipped += 1 skipped += 1
continue continue
addr = str(func.getEntryPoint()) addr = str(func.getEntryPoint())
item = { item = {
"name": func_name, "name": func.getName(),
"address": addr, "address": addr,
"signature": str(func.getSignature()), "signature": str(func.getSignature()),
"parameterCount": func.getParameterCount(), "parameterCount": func.getParameterCount(),
@ -1413,36 +1358,6 @@ class GhydraMCPHandler(HttpHandler):
"""List memory blocks (alias for /segments).""" """List memory blocks (alias for /segments)."""
return self.handle_segments(exchange) return self.handle_segments(exchange)
def handle_get_comment(self, exchange, addr_str, comment_type):
"""Get a comment at a specific address."""
if not self.program:
return self._no_program()
ct = COMMENT_TYPE_MAP.get(comment_type.lower())
if ct is None:
return {"success": False, "error": {
"code": "INVALID_COMMENT_TYPE",
"message": "Invalid comment type: %s. Use: pre, post, eol, plate, repeatable" % comment_type
}}
try:
addr = self.program.getAddressFactory().getAddress(addr_str)
listing = self.program.getListing()
cu = listing.getCodeUnitAt(addr)
if not cu:
cu = listing.getCodeUnitContaining(addr)
if not cu:
return {"success": True, "result": {"address": addr_str, "commentType": comment_type, "comment": None}}
comment = cu.getComment(ct)
return {"success": True, "result": {
"address": addr_str,
"commentType": comment_type,
"comment": comment,
}}
except Exception as e:
return {"success": False, "error": {"code": "COMMENT_ERROR", "message": str(e)}}
def handle_set_comment(self, exchange, addr_str, comment_type): def handle_set_comment(self, exchange, addr_str, comment_type):
"""Set a comment at a specific address.""" """Set a comment at a specific address."""
if not self.program: if not self.program:
@ -2197,518 +2112,6 @@ class GhydraMCPHandler(HttpHandler):
except Exception as e: except Exception as e:
return {"success": False, "error": {"code": "DATAFLOW_ERROR", "message": str(e)}} return {"success": False, "error": {"code": "DATAFLOW_ERROR", "message": str(e)}}
# ==================================================================
# Symbol CRUD Handlers
# ==================================================================
def handle_symbol_create(self, exchange):
"""POST /symbols - Create a new label/symbol."""
if not self.program:
return self._no_program()
body = parse_json_body(exchange)
name = body.get("name", "")
address = body.get("address", "")
if not name or not address:
return {"success": False, "error": {"code": "MISSING_PARAMETER", "message": "Both 'name' and 'address' are required"}}
try:
addr = self.program.getAddressFactory().getAddress(address)
st = self.program.getSymbolTable()
def do_create():
st.createLabel(addr, name, SourceType.USER_DEFINED)
with_transaction(self.program, "Create symbol", do_create)
return {"success": True, "result": {
"name": name,
"address": address,
"message": "Symbol created successfully",
}}
except Exception as e:
return {"success": False, "error": {"code": "SYMBOL_ERROR", "message": str(e)}}
def handle_symbol_rename(self, exchange, addr_str):
"""PATCH /symbols/{address} - Rename primary symbol at address."""
if not self.program:
return self._no_program()
body = parse_json_body(exchange)
new_name = body.get("name", "")
if not new_name:
return {"success": False, "error": {"code": "MISSING_PARAMETER", "message": "'name' parameter is required"}}
try:
addr = self.program.getAddressFactory().getAddress(addr_str)
st = self.program.getSymbolTable()
symbol = st.getPrimarySymbol(addr)
if not symbol:
return {"success": False, "error": {"code": "NOT_FOUND", "message": "No symbol at address: %s" % addr_str}}
old_name = symbol.getName()
def do_rename():
symbol.setName(new_name, SourceType.USER_DEFINED)
with_transaction(self.program, "Rename symbol", do_rename)
return {"success": True, "result": {
"address": addr_str,
"oldName": old_name,
"newName": new_name,
"message": "Symbol renamed successfully",
}}
except Exception as e:
return {"success": False, "error": {"code": "SYMBOL_ERROR", "message": str(e)}}
def handle_symbol_delete(self, exchange, addr_str):
"""DELETE /symbols/{address} - Delete primary symbol at address."""
if not self.program:
return self._no_program()
try:
addr = self.program.getAddressFactory().getAddress(addr_str)
st = self.program.getSymbolTable()
symbol = st.getPrimarySymbol(addr)
if not symbol:
return {"success": False, "error": {"code": "NOT_FOUND", "message": "No symbol at address: %s" % addr_str}}
name = symbol.getName()
def do_delete():
symbol.delete()
with_transaction(self.program, "Delete symbol", do_delete)
return {"success": True, "result": {
"address": addr_str,
"name": name,
"message": "Symbol deleted successfully",
}}
except Exception as e:
return {"success": False, "error": {"code": "SYMBOL_ERROR", "message": str(e)}}
# ==================================================================
# Variable Rename Handler
# ==================================================================
def handle_variable_rename(self, exchange, addr_str, var_name):
"""PATCH /functions/{address}/variables/{name} - Rename/retype a variable."""
if not self.program:
return self._no_program()
body = parse_json_body(exchange)
new_name = body.get("name", "")
new_type = body.get("data_type")
if not new_name:
return {"success": False, "error": {"code": "MISSING_PARAMETER", "message": "'name' parameter is required"}}
try:
# URL-decode the variable name
from java.net import URLDecoder
decoded_name = URLDecoder.decode(var_name, "UTF-8")
func = self._find_function_at(addr_str)
if not func:
return {"success": False, "error": {"code": "FUNCTION_NOT_FOUND", "message": "No function at address: %s" % addr_str}}
# Search parameters and local variables
target_var = None
for param in func.getParameters():
if param.getName() == decoded_name:
target_var = param
break
if not target_var:
for var in func.getAllVariables():
if var.getName() == decoded_name:
target_var = var
break
if not target_var:
return {"success": False, "error": {"code": "NOT_FOUND", "message": "Variable '%s' not found in function" % decoded_name}}
old_name = target_var.getName()
def do_rename():
target_var.setName(new_name, SourceType.USER_DEFINED)
if new_type:
dtm = self.program.getDataTypeManager()
dt = resolve_data_type(dtm, new_type)
if dt:
target_var.setDataType(dt, SourceType.USER_DEFINED)
with_transaction(self.program, "Rename variable", do_rename)
return {"success": True, "result": {
"function": addr_str,
"oldName": old_name,
"newName": new_name,
"message": "Variable renamed successfully",
}}
except Exception as e:
return {"success": False, "error": {"code": "VARIABLE_ERROR", "message": str(e)}}
# ==================================================================
# Variables Handler
# ==================================================================
def handle_variables(self, exchange):
"""GET /variables - List global and function variables."""
if not self.program:
return self._no_program()
params = parse_query_params(exchange)
limit = parse_int(params.get("limit"), 100)
offset = parse_int(params.get("offset"), 0)
global_only = params.get("global_only", "false").lower() == "true"
search = params.get("search", "")
grep_pattern = compile_grep(params)
variables = []
count = 0
skipped = 0
# Function variables (parameters + locals)
if not global_only:
fm = self.program.getFunctionManager()
for func in fm.getFunctions(True):
if count >= limit:
break
func_name = func.getName()
func_addr = str(func.getEntryPoint())
for param in func.getParameters():
if count >= limit:
break
var_name = param.getName()
if search and search.lower() not in var_name.lower():
continue
if skipped < offset:
skipped += 1
continue
item = {
"name": var_name,
"type": str(param.getDataType()),
"storage": str(param.getVariableStorage()),
"scope": "parameter",
"function": func_name,
"functionAddress": func_addr,
}
if not grep_matches_item(item, grep_pattern):
continue
variables.append(item)
count += 1
for var in func.getLocalVariables():
if count >= limit:
break
var_name = var.getName()
if search and search.lower() not in var_name.lower():
continue
if skipped < offset:
skipped += 1
continue
item = {
"name": var_name,
"type": str(var.getDataType()),
"storage": str(var.getVariableStorage()),
"scope": "local",
"function": func_name,
"functionAddress": func_addr,
}
if not grep_matches_item(item, grep_pattern):
continue
variables.append(item)
count += 1
# Global variables (defined data with symbol names)
listing = self.program.getListing()
st = self.program.getSymbolTable()
for data in listing.getDefinedData(True):
if count >= limit:
break
addr = data.getAddress()
symbol = st.getPrimarySymbol(addr)
if not symbol:
continue
sym_name = symbol.getName()
# Skip auto-generated names
if sym_name.startswith("DAT_") or sym_name.startswith("s_"):
continue
if search and search.lower() not in sym_name.lower():
continue
if skipped < offset:
skipped += 1
continue
item = {
"name": sym_name,
"address": str(addr),
"type": str(data.getDataType()),
"scope": "global",
"size": data.getLength(),
}
if not grep_matches_item(item, grep_pattern):
continue
variables.append(item)
count += 1
return {"success": True, "result": variables, "offset": offset, "limit": limit}
# ==================================================================
# Bookmarks Handlers
# ==================================================================
def handle_bookmarks(self, exchange):
"""GET /bookmarks - List bookmarks with optional filtering."""
if not self.program:
return self._no_program()
params = parse_query_params(exchange)
limit = parse_int(params.get("limit"), 100)
offset = parse_int(params.get("offset"), 0)
type_filter = params.get("type")
category_filter = params.get("category")
grep_pattern = compile_grep(params)
bm = self.program.getBookmarkManager()
bookmarks = []
count = 0
skipped = 0
# Get bookmark types to iterate
if type_filter:
bm_types = [type_filter]
else:
bm_types = [str(t) for t in bm.getBookmarkTypes()]
for btype in bm_types:
if count >= limit:
break
try:
it = bm.getBookmarksIterator(btype)
except:
continue
while it.hasNext() and count < limit:
bookmark = it.next()
if category_filter and bookmark.getCategory() != category_filter:
continue
if skipped < offset:
skipped += 1
continue
item = {
"address": str(bookmark.getAddress()),
"type": bookmark.getTypeString(),
"category": bookmark.getCategory(),
"comment": bookmark.getComment(),
}
if not grep_matches_item(item, grep_pattern):
continue
bookmarks.append(item)
count += 1
return {"success": True, "result": bookmarks, "offset": offset, "limit": limit}
def handle_bookmark_create(self, exchange):
"""POST /bookmarks - Create a bookmark."""
if not self.program:
return self._no_program()
body = parse_json_body(exchange)
address = body.get("address", "")
btype = body.get("type", "Note")
category = body.get("category", "")
comment = body.get("comment", "")
if not address:
return {"success": False, "error": {"code": "MISSING_PARAMETER", "message": "'address' is required"}}
try:
addr = self.program.getAddressFactory().getAddress(address)
bm = self.program.getBookmarkManager()
def do_create():
bm.setBookmark(addr, btype, category, comment)
with_transaction(self.program, "Create bookmark", do_create)
return {"success": True, "result": {
"address": address,
"type": btype,
"category": category,
"comment": comment,
"message": "Bookmark created successfully",
}}
except Exception as e:
return {"success": False, "error": {"code": "BOOKMARK_ERROR", "message": str(e)}}
def handle_bookmark_delete(self, exchange, addr_str):
"""DELETE /bookmarks/{address} - Delete all bookmarks at address."""
if not self.program:
return self._no_program()
try:
addr = self.program.getAddressFactory().getAddress(addr_str)
bm = self.program.getBookmarkManager()
removed = []
def do_delete():
for bookmark in list(bm.getBookmarks(addr)):
removed.append(bookmark.getTypeString())
bookmark.remove() if hasattr(bookmark, 'remove') else bm.removeBookmark(bookmark)
with_transaction(self.program, "Delete bookmarks", do_delete)
return {"success": True, "result": {
"address": addr_str,
"removedTypes": removed,
"count": len(removed),
"message": "Bookmarks deleted successfully",
}}
except Exception as e:
return {"success": False, "error": {"code": "BOOKMARK_ERROR", "message": str(e)}}
# ==================================================================
# Enum Handlers
# ==================================================================
def handle_enums(self, exchange):
"""GET /datatypes/enums - List enum data types."""
if not self.program:
return self._no_program()
params = parse_query_params(exchange)
limit = parse_int(params.get("limit"), 100)
offset = parse_int(params.get("offset"), 0)
grep_pattern = compile_grep(params)
from ghidra.program.model.data import Enum as GhidraEnum
dtm = self.program.getDataTypeManager()
enums = []
count = 0
skipped = 0
for dt in dtm.getAllDataTypes():
if count >= limit:
break
if not isinstance(dt, GhidraEnum):
continue
if skipped < offset:
skipped += 1
continue
# Get enum members
members = []
for name in dt.getNames():
members.append({"name": name, "value": dt.getValue(name)})
item = {
"name": dt.getName(),
"category": str(dt.getCategoryPath()),
"size": dt.getLength(),
"members": members,
}
if not grep_matches_item(item, grep_pattern):
continue
enums.append(item)
count += 1
return {"success": True, "result": enums, "offset": offset, "limit": limit}
def handle_enum_create(self, exchange):
"""POST /datatypes/enums - Create a new enum."""
if not self.program:
return self._no_program()
body = parse_json_body(exchange)
name = body.get("name", "")
size = int(body.get("size", 4))
if not name:
return {"success": False, "error": {"code": "MISSING_PARAMETER", "message": "'name' is required"}}
try:
from ghidra.program.model.data import EnumDataType, CategoryPath
dtm = self.program.getDataTypeManager()
new_enum = EnumDataType(name, size)
def do_create():
dtm.addDataType(new_enum, None)
with_transaction(self.program, "Create enum", do_create)
return {"success": True, "result": {
"name": name,
"size": size,
"message": "Enum created successfully",
}}
except Exception as e:
return {"success": False, "error": {"code": "ENUM_ERROR", "message": str(e)}}
# ==================================================================
# Typedef Handlers
# ==================================================================
def handle_typedefs(self, exchange):
"""GET /datatypes/typedefs - List typedef data types."""
if not self.program:
return self._no_program()
params = parse_query_params(exchange)
limit = parse_int(params.get("limit"), 100)
offset = parse_int(params.get("offset"), 0)
grep_pattern = compile_grep(params)
from ghidra.program.model.data import TypeDef
dtm = self.program.getDataTypeManager()
typedefs = []
count = 0
skipped = 0
for dt in dtm.getAllDataTypes():
if count >= limit:
break
if not isinstance(dt, TypeDef):
continue
if skipped < offset:
skipped += 1
continue
item = {
"name": dt.getName(),
"category": str(dt.getCategoryPath()),
"baseType": dt.getBaseDataType().getName() if dt.getBaseDataType() else None,
"size": dt.getLength(),
}
if not grep_matches_item(item, grep_pattern):
continue
typedefs.append(item)
count += 1
return {"success": True, "result": typedefs, "offset": offset, "limit": limit}
def handle_typedef_create(self, exchange):
"""POST /datatypes/typedefs - Create a new typedef."""
if not self.program:
return self._no_program()
body = parse_json_body(exchange)
name = body.get("name", "")
base_type_name = body.get("base_type", "")
if not name or not base_type_name:
return {"success": False, "error": {"code": "MISSING_PARAMETER", "message": "'name' and 'base_type' are required"}}
try:
from ghidra.program.model.data import TypedefDataType
dtm = self.program.getDataTypeManager()
# Use the shared resolver which handles builtins + path lookups
base_dt = resolve_data_type(dtm, base_type_name)
if not base_dt:
return {"success": False, "error": {"code": "NOT_FOUND", "message": "Base type not found: %s" % base_type_name}}
new_typedef = TypedefDataType(name, base_dt)
def do_create():
dtm.addDataType(new_typedef, None)
with_transaction(self.program, "Create typedef", do_create)
return {"success": True, "result": {
"name": name,
"baseType": base_type_name,
"message": "Typedef created successfully",
}}
except Exception as e:
return {"success": False, "error": {"code": "TYPEDEF_ERROR", "message": str(e)}}
# ================================================================== # ==================================================================
# Legacy Compatibility # Legacy Compatibility
# ================================================================== # ==================================================================

View File

@ -97,14 +97,6 @@ class GhydraConfig:
"data": 1000, "data": 1000,
"structs": 500, "structs": 500,
"xrefs": 500, "xrefs": 500,
"symbols": 1000,
"segments": 500,
"variables": 1000,
"namespaces": 500,
"classes": 500,
"bookmarks": 1000,
"enums": 500,
"typedefs": 500,
}) })
def __post_init__(self): def __post_init__(self):

View File

@ -3,38 +3,38 @@
Contains HTTP client, pagination, progress reporting, and logging utilities. Contains HTTP client, pagination, progress reporting, and logging utilities.
""" """
from .filtering import (
apply_grep,
estimate_and_guard,
project_fields,
)
from .http_client import ( from .http_client import (
get_instance_url,
safe_delete,
safe_get, safe_get,
safe_patch,
safe_post, safe_post,
safe_put, safe_put,
safe_patch,
safe_delete,
simplify_response, simplify_response,
) get_instance_url,
from .logging import (
log_debug,
log_error,
log_info,
log_warning,
) )
from .pagination import ( from .pagination import (
CursorManager, CursorManager,
CursorState, CursorState,
estimate_tokens,
get_cursor_manager,
paginate_response, paginate_response,
get_cursor_manager,
estimate_tokens,
) )
from .progress import ( from .progress import (
ProgressReporter, ProgressReporter,
report_progress, report_progress,
report_step, report_step,
) )
from .filtering import (
project_fields,
apply_grep,
estimate_and_guard,
)
from .logging import (
log_info,
log_debug,
log_warning,
log_error,
)
__all__ = [ __all__ = [
# HTTP client # HTTP client

View File

@ -11,6 +11,7 @@ from typing import Any, Dict, Optional
from ..config import get_config from ..config import get_config
# Token estimation (same ratio as pagination.py) # Token estimation (same ratio as pagination.py)
TOKEN_ESTIMATION_RATIO = 4.0 TOKEN_ESTIMATION_RATIO = 4.0

View File

@ -12,6 +12,7 @@ import requests
from ..config import get_config from ..config import get_config
# Allowed origins for CORS-like validation # Allowed origins for CORS-like validation
ALLOWED_ORIGINS = { ALLOWED_ORIGINS = {
"http://localhost", "http://localhost",

View File

@ -5,7 +5,7 @@ client-visible logging when available, with fallback to standard logging.
""" """
import logging import logging
from typing import TYPE_CHECKING, Optional from typing import Optional, TYPE_CHECKING
if TYPE_CHECKING: if TYPE_CHECKING:
from mcp.server.fastmcp import Context from mcp.server.fastmcp import Context

View File

@ -14,7 +14,8 @@ from threading import Lock
from typing import Any, Dict, List, Optional, Tuple from typing import Any, Dict, List, Optional, Tuple
from ..config import get_config from ..config import get_config
from .filtering import estimate_and_guard, project_fields from .filtering import project_fields, estimate_and_guard
# ReDoS Protection Configuration # ReDoS Protection Configuration
MAX_GREP_PATTERN_LENGTH = 500 MAX_GREP_PATTERN_LENGTH = 500

View File

@ -4,7 +4,7 @@ Provides async progress reporting using FastMCP's Context for
real-time progress notifications to MCP clients. real-time progress notifications to MCP clients.
""" """
from typing import TYPE_CHECKING, Optional from typing import Optional, TYPE_CHECKING
if TYPE_CHECKING: if TYPE_CHECKING:
from mcp.server.fastmcp import Context from mcp.server.fastmcp import Context

View File

@ -4,22 +4,16 @@ Domain-specific mixins that organize tools, resources, and prompts by functional
Uses FastMCP's contrib.mcp_mixin pattern for clean modular organization. Uses FastMCP's contrib.mcp_mixin pattern for clean modular organization.
""" """
from .analysis import AnalysisMixin
from .base import GhydraMixinBase from .base import GhydraMixinBase
from .bookmarks import BookmarksMixin
from .cursors import CursorsMixin
from .data import DataMixin
from .datatypes import DataTypesMixin
from .docker import DockerMixin
from .functions import FunctionsMixin
from .instances import InstancesMixin from .instances import InstancesMixin
from .memory import MemoryMixin from .functions import FunctionsMixin
from .namespaces import NamespacesMixin from .data import DataMixin
from .segments import SegmentsMixin
from .structs import StructsMixin from .structs import StructsMixin
from .symbols import SymbolsMixin from .analysis import AnalysisMixin
from .variables import VariablesMixin from .memory import MemoryMixin
from .xrefs import XrefsMixin from .xrefs import XrefsMixin
from .cursors import CursorsMixin
from .docker import DockerMixin
__all__ = [ __all__ = [
"GhydraMixinBase", "GhydraMixinBase",
@ -32,10 +26,4 @@ __all__ = [
"XrefsMixin", "XrefsMixin",
"CursorsMixin", "CursorsMixin",
"DockerMixin", "DockerMixin",
"SymbolsMixin",
"SegmentsMixin",
"VariablesMixin",
"NamespacesMixin",
"BookmarksMixin",
"DataTypesMixin",
] ]

View File

@ -8,8 +8,8 @@ from typing import Any, Dict, List, Optional
from fastmcp import Context from fastmcp import Context
from fastmcp.contrib.mcp_mixin import mcp_tool from fastmcp.contrib.mcp_mixin import mcp_tool
from ..config import get_config
from .base import GhydraMixinBase from .base import GhydraMixinBase
from ..config import get_config
class AnalysisMixin(GhydraMixinBase): class AnalysisMixin(GhydraMixinBase):
@ -277,40 +277,6 @@ class AnalysisMixin(GhydraMixinBase):
response = self.safe_get(port, "function") response = self.safe_get(port, "function")
return self.simplify_response(response) return self.simplify_response(response)
@mcp_tool()
def comments_get(
self,
address: str,
comment_type: str = "plate",
port: Optional[int] = None,
) -> Dict[str, Any]:
"""Get a comment at the specified address.
Args:
address: Memory address in hex format
comment_type: "plate", "pre", "post", "eol", "repeatable"
port: Ghidra instance port (optional)
Returns:
Comment text and metadata
"""
if not address:
return {
"success": False,
"error": {
"code": "MISSING_PARAMETER",
"message": "Address parameter is required",
},
}
try:
port = self.get_instance_port(port)
except ValueError as e:
return {"success": False, "error": {"code": "NO_INSTANCE", "message": str(e)}}
response = self.safe_get(port, f"memory/{address}/comments/{comment_type}")
return self.simplify_response(response)
@mcp_tool() @mcp_tool()
def comments_set( def comments_set(
self, self,

View File

@ -11,16 +11,9 @@ from fastmcp import Context
from fastmcp.contrib.mcp_mixin import MCPMixin from fastmcp.contrib.mcp_mixin import MCPMixin
from ..config import get_config from ..config import get_config
from ..core.http_client import ( from ..core.http_client import safe_get, safe_post, safe_put, safe_patch, safe_delete, simplify_response
safe_delete,
safe_get,
safe_patch,
safe_post,
safe_put,
simplify_response,
)
from ..core.logging import log_debug, log_error, log_info, log_warning
from ..core.pagination import paginate_response from ..core.pagination import paginate_response
from ..core.logging import log_info, log_debug, log_warning, log_error
class GhydraMixinBase(MCPMixin): class GhydraMixinBase(MCPMixin):

View File

@ -1,171 +0,0 @@
"""Bookmarks mixin for GhydraMCP.
Provides tools for managing Ghidra bookmarks (annotations at addresses).
"""
from typing import Any, Dict, List, Optional
from fastmcp import Context
from fastmcp.contrib.mcp_mixin import mcp_tool
from ..config import get_config
from .base import GhydraMixinBase
class BookmarksMixin(GhydraMixinBase):
"""Mixin for bookmark operations.
Provides tools for:
- Listing bookmarks with type/category filtering
- Creating bookmarks at addresses
- Deleting bookmarks
"""
@mcp_tool()
def bookmarks_list(
self,
type: Optional[str] = None,
category: Optional[str] = None,
port: Optional[int] = None,
page_size: int = 50,
grep: Optional[str] = None,
grep_ignorecase: bool = True,
return_all: bool = False,
fields: Optional[List[str]] = None,
ctx: Optional[Context] = None,
) -> Dict[str, Any]:
"""List bookmarks with optional type/category filtering.
Args:
type: Filter by bookmark type (e.g. "Note", "Warning", "Error", "Info")
category: Filter by bookmark category
port: Ghidra instance port (optional)
page_size: Bookmarks per page (default: 50, max: 500)
grep: Regex pattern to filter bookmark comments
grep_ignorecase: Case-insensitive grep (default: True)
return_all: Return all bookmarks without pagination
fields: Field names to keep (e.g. ['address', 'type', 'comment']). Reduces response size.
ctx: FastMCP context (auto-injected)
Returns:
Paginated list of bookmarks
"""
try:
port = self.get_instance_port(port)
except ValueError as e:
return {"success": False, "error": {"code": "NO_INSTANCE", "message": str(e)}}
config = get_config()
cap = config.resource_caps.get("bookmarks", 1000)
params: Dict[str, Any] = {"limit": cap}
if type:
params["type"] = type
if category:
params["category"] = category
response = self.safe_get(port, "bookmarks", params)
simplified = self.simplify_response(response)
if not simplified.get("success", True):
return simplified
bookmarks = simplified.get("result", [])
if not isinstance(bookmarks, list):
bookmarks = []
query_params = {
"tool": "bookmarks_list",
"port": port,
"type": type,
"category": category,
"grep": grep,
}
session_id = self._get_session_id(ctx)
return self.filtered_paginate(
data=bookmarks,
query_params=query_params,
tool_name="bookmarks_list",
session_id=session_id,
page_size=min(page_size, config.max_page_size),
grep=grep,
grep_ignorecase=grep_ignorecase,
return_all=return_all,
fields=fields,
)
@mcp_tool()
def bookmarks_create(
self,
address: str,
type: str = "Note",
category: str = "",
comment: str = "",
port: Optional[int] = None,
) -> Dict[str, Any]:
"""Create a bookmark at the specified address.
Args:
address: Memory address in hex format
type: Bookmark type (default: "Note"). Common types: Note, Warning, Error, Info
category: Bookmark category (optional grouping string)
comment: Bookmark comment text
port: Ghidra instance port (optional)
Returns:
Created bookmark information
"""
if not address:
return {
"success": False,
"error": {
"code": "MISSING_PARAMETER",
"message": "address parameter is required",
},
}
try:
port = self.get_instance_port(port)
except ValueError as e:
return {"success": False, "error": {"code": "NO_INSTANCE", "message": str(e)}}
payload = {
"address": address,
"type": type,
"category": category,
"comment": comment,
}
response = self.safe_post(port, "bookmarks", payload)
return self.simplify_response(response)
@mcp_tool()
def bookmarks_delete(
self,
address: str,
port: Optional[int] = None,
) -> Dict[str, Any]:
"""Delete all bookmarks at the specified address.
Args:
address: Memory address in hex format
port: Ghidra instance port (optional)
Returns:
Operation result
"""
if not address:
return {
"success": False,
"error": {
"code": "MISSING_PARAMETER",
"message": "address parameter is required",
},
}
try:
port = self.get_instance_port(port)
except ValueError as e:
return {"success": False, "error": {"code": "NO_INSTANCE", "message": str(e)}}
response = self.safe_delete(port, f"bookmarks/{address}")
return self.simplify_response(response)

View File

@ -8,8 +8,8 @@ from typing import Any, Dict, Optional
from fastmcp import Context from fastmcp import Context
from fastmcp.contrib.mcp_mixin import mcp_tool from fastmcp.contrib.mcp_mixin import mcp_tool
from ..core.pagination import get_cursor_manager
from .base import GhydraMixinBase from .base import GhydraMixinBase
from ..core.pagination import get_cursor_manager
class CursorsMixin(GhydraMixinBase): class CursorsMixin(GhydraMixinBase):

View File

@ -6,10 +6,10 @@ Provides tools for data items and strings operations.
from typing import Any, Dict, List, Optional from typing import Any, Dict, List, Optional
from fastmcp import Context from fastmcp import Context
from fastmcp.contrib.mcp_mixin import mcp_resource, mcp_tool from fastmcp.contrib.mcp_mixin import mcp_tool, mcp_resource
from ..config import get_config
from .base import GhydraMixinBase from .base import GhydraMixinBase
from ..config import get_config
class DataMixin(GhydraMixinBase): class DataMixin(GhydraMixinBase):

View File

@ -1,217 +0,0 @@
"""Data types mixin for GhydraMCP.
Provides tools for managing enum and typedef data types.
"""
from typing import Any, Dict, List, Optional
from fastmcp import Context
from fastmcp.contrib.mcp_mixin import mcp_tool
from ..config import get_config
from .base import GhydraMixinBase
class DataTypesMixin(GhydraMixinBase):
"""Mixin for enum and typedef data type operations.
Provides tools for:
- Listing and creating enum data types
- Listing and creating typedef data types
"""
# --- Enums ---
@mcp_tool()
def enums_list(
self,
port: Optional[int] = None,
page_size: int = 50,
grep: Optional[str] = None,
grep_ignorecase: bool = True,
return_all: bool = False,
fields: Optional[List[str]] = None,
ctx: Optional[Context] = None,
) -> Dict[str, Any]:
"""List enum data types with their members.
Args:
port: Ghidra instance port (optional)
page_size: Enums per page (default: 50, max: 500)
grep: Regex pattern to filter enum names
grep_ignorecase: Case-insensitive grep (default: True)
return_all: Return all enums without pagination
fields: Field names to keep (e.g. ['name', 'size', 'members']). Reduces response size.
ctx: FastMCP context (auto-injected)
Returns:
Paginated list of enum data types
"""
try:
port = self.get_instance_port(port)
except ValueError as e:
return {"success": False, "error": {"code": "NO_INSTANCE", "message": str(e)}}
config = get_config()
cap = config.resource_caps.get("enums", 500)
response = self.safe_get(port, "datatypes/enums", {"limit": cap})
simplified = self.simplify_response(response)
if not simplified.get("success", True):
return simplified
enums = simplified.get("result", [])
if not isinstance(enums, list):
enums = []
query_params = {"tool": "enums_list", "port": port, "grep": grep}
session_id = self._get_session_id(ctx)
return self.filtered_paginate(
data=enums,
query_params=query_params,
tool_name="enums_list",
session_id=session_id,
page_size=min(page_size, config.max_page_size),
grep=grep,
grep_ignorecase=grep_ignorecase,
return_all=return_all,
fields=fields,
)
@mcp_tool()
def enums_create(
self,
name: str,
size: int = 4,
port: Optional[int] = None,
) -> Dict[str, Any]:
"""Create a new enum data type.
Args:
name: Name for the new enum
size: Size in bytes (default: 4)
port: Ghidra instance port (optional)
Returns:
Created enum information
"""
if not name:
return {
"success": False,
"error": {
"code": "MISSING_PARAMETER",
"message": "name parameter is required",
},
}
try:
port = self.get_instance_port(port)
except ValueError as e:
return {"success": False, "error": {"code": "NO_INSTANCE", "message": str(e)}}
payload = {"name": name, "size": size}
response = self.safe_post(port, "datatypes/enums", payload)
return self.simplify_response(response)
# --- Typedefs ---
@mcp_tool()
def typedefs_list(
self,
port: Optional[int] = None,
page_size: int = 50,
grep: Optional[str] = None,
grep_ignorecase: bool = True,
return_all: bool = False,
fields: Optional[List[str]] = None,
ctx: Optional[Context] = None,
) -> Dict[str, Any]:
"""List typedef data types.
Args:
port: Ghidra instance port (optional)
page_size: Typedefs per page (default: 50, max: 500)
grep: Regex pattern to filter typedef names
grep_ignorecase: Case-insensitive grep (default: True)
return_all: Return all typedefs without pagination
fields: Field names to keep (e.g. ['name', 'base_type']). Reduces response size.
ctx: FastMCP context (auto-injected)
Returns:
Paginated list of typedef data types
"""
try:
port = self.get_instance_port(port)
except ValueError as e:
return {"success": False, "error": {"code": "NO_INSTANCE", "message": str(e)}}
config = get_config()
cap = config.resource_caps.get("typedefs", 500)
response = self.safe_get(port, "datatypes/typedefs", {"limit": cap})
simplified = self.simplify_response(response)
if not simplified.get("success", True):
return simplified
typedefs = simplified.get("result", [])
if not isinstance(typedefs, list):
typedefs = []
query_params = {"tool": "typedefs_list", "port": port, "grep": grep}
session_id = self._get_session_id(ctx)
return self.filtered_paginate(
data=typedefs,
query_params=query_params,
tool_name="typedefs_list",
session_id=session_id,
page_size=min(page_size, config.max_page_size),
grep=grep,
grep_ignorecase=grep_ignorecase,
return_all=return_all,
fields=fields,
)
@mcp_tool()
def typedefs_create(
self,
name: str,
base_type: str,
port: Optional[int] = None,
) -> Dict[str, Any]:
"""Create a new typedef data type.
Args:
name: Name for the new typedef
base_type: Name of the base data type (e.g. "int", "uint32_t", "char*")
port: Ghidra instance port (optional)
Returns:
Created typedef information
"""
if not name:
return {
"success": False,
"error": {
"code": "MISSING_PARAMETER",
"message": "name parameter is required",
},
}
if not base_type:
return {
"success": False,
"error": {
"code": "MISSING_PARAMETER",
"message": "base_type parameter is required",
},
}
try:
port = self.get_instance_port(port)
except ValueError as e:
return {"success": False, "error": {"code": "NO_INSTANCE", "message": str(e)}}
payload = {"name": name, "base_type": base_type}
response = self.safe_post(port, "datatypes/typedefs", payload)
return self.simplify_response(response)

View File

@ -16,11 +16,14 @@ import subprocess
import time import time
import uuid import uuid
from pathlib import Path from pathlib import Path
from typing import Any, Dict, List, Optional from typing import Any, Dict, List, Optional, Set
from fastmcp import Context from fastmcp import Context
from fastmcp.contrib.mcp_mixin import MCPMixin, mcp_tool from fastmcp.contrib.mcp_mixin import MCPMixin, mcp_tool
from ..config import get_config, get_docker_config
# Port pool configuration # Port pool configuration
PORT_POOL_START = 8192 PORT_POOL_START = 8192
PORT_POOL_END = 8199 PORT_POOL_END = 8199
@ -820,9 +823,9 @@ class DockerMixin(MCPMixin):
Returns: Returns:
Health status and API info if available Health status and API info if available
""" """
import json as json_module
import urllib.error
import urllib.request import urllib.request
import urllib.error
import json as json_module
url = f"http://localhost:{port}/" url = f"http://localhost:{port}/"
@ -925,20 +928,10 @@ class DockerMixin(MCPMixin):
Returns: Returns:
Instance connection info with session ID and port details Instance connection info with session ID and port details
""" """
import os
requested_name = os.path.basename(binary_path)
def _is_same_binary(health_program: str) -> bool:
"""Check if a running instance has the same binary loaded."""
if not health_program:
return False
return os.path.basename(health_program) == requested_name
# If port is specified, check that specific port # If port is specified, check that specific port
if port is not None: if port is not None:
health = await self.docker_health(port=port, ctx=ctx) health = await self.docker_health(port=port, ctx=ctx)
if health.get("healthy") and _is_same_binary(health.get("program", "")): if health.get("healthy"):
return { return {
"source": "existing", "source": "existing",
"session_id": self.session_id, "session_id": self.session_id,
@ -948,10 +941,10 @@ class DockerMixin(MCPMixin):
"message": "Using existing Ghidra instance", "message": "Using existing Ghidra instance",
} }
else: else:
# Check all pooled ports for an instance with the SAME binary # Check all pooled ports for an existing instance
for check_port in range(PORT_POOL_START, PORT_POOL_END + 1): for check_port in range(PORT_POOL_START, PORT_POOL_END + 1):
health = await self.docker_health(port=check_port, timeout=1.0, ctx=ctx) health = await self.docker_health(port=check_port, timeout=1.0, ctx=ctx)
if health.get("healthy") and _is_same_binary(health.get("program", "")): if health.get("healthy"):
return { return {
"source": "existing", "source": "existing",
"session_id": self.session_id, "session_id": self.session_id,

View File

@ -7,10 +7,10 @@ from typing import Any, Dict, List, Optional
from urllib.parse import quote from urllib.parse import quote
from fastmcp import Context from fastmcp import Context
from fastmcp.contrib.mcp_mixin import mcp_resource, mcp_tool from fastmcp.contrib.mcp_mixin import mcp_tool, mcp_resource
from ..config import get_config
from .base import GhydraMixinBase from .base import GhydraMixinBase
from ..config import get_config
class FunctionsMixin(GhydraMixinBase): class FunctionsMixin(GhydraMixinBase):
@ -28,9 +28,6 @@ class FunctionsMixin(GhydraMixinBase):
@mcp_tool() @mcp_tool()
def functions_list( def functions_list(
self, self,
name_contains: Optional[str] = None,
name_regex: Optional[str] = None,
address: Optional[str] = None,
port: Optional[int] = None, port: Optional[int] = None,
page_size: int = 50, page_size: int = 50,
grep: Optional[str] = None, grep: Optional[str] = None,
@ -39,15 +36,12 @@ class FunctionsMixin(GhydraMixinBase):
fields: Optional[List[str]] = None, fields: Optional[List[str]] = None,
ctx: Optional[Context] = None, ctx: Optional[Context] = None,
) -> Dict[str, Any]: ) -> Dict[str, Any]:
"""List functions with cursor-based pagination and server-side filtering. """List functions with cursor-based pagination.
Args: Args:
name_contains: Server-side substring filter on function name (faster than grep for large binaries)
name_regex: Server-side regex filter on function name
address: Filter by exact function address (hex)
port: Ghidra instance port (optional) port: Ghidra instance port (optional)
page_size: Functions per page (default: 50, max: 500) page_size: Functions per page (default: 50, max: 500)
grep: Client-side regex pattern to filter function names grep: Regex pattern to filter function names
grep_ignorecase: Case-insensitive grep (default: True) grep_ignorecase: Case-insensitive grep (default: True)
return_all: Return all functions without pagination return_all: Return all functions without pagination
fields: Field names to keep (e.g. ['name', 'address']). Reduces response size. fields: Field names to keep (e.g. ['name', 'address']). Reduces response size.
@ -62,15 +56,7 @@ class FunctionsMixin(GhydraMixinBase):
return {"success": False, "error": {"code": "NO_INSTANCE", "message": str(e)}} return {"success": False, "error": {"code": "NO_INSTANCE", "message": str(e)}}
config = get_config() config = get_config()
params = {"limit": 10000} response = self.safe_get(port, "functions", {"limit": 10000})
if name_contains:
params["name_contains"] = name_contains
if name_regex:
params["name_matches_regex"] = name_regex
if address:
params["addr"] = address
response = self.safe_get(port, "functions", params)
simplified = self.simplify_response(response) simplified = self.simplify_response(response)
if not simplified.get("success", True): if not simplified.get("success", True):
@ -80,14 +66,7 @@ class FunctionsMixin(GhydraMixinBase):
if not isinstance(functions, list): if not isinstance(functions, list):
functions = [] functions = []
query_params = { query_params = {"tool": "functions_list", "port": port, "grep": grep}
"tool": "functions_list",
"port": port,
"name_contains": name_contains,
"name_regex": name_regex,
"address": address,
"grep": grep,
}
session_id = self._get_session_id(ctx) session_id = self._get_session_id(ctx)
return self.filtered_paginate( return self.filtered_paginate(
@ -488,7 +467,7 @@ class FunctionsMixin(GhydraMixinBase):
"functions": functions[:cap], "functions": functions[:cap],
"count": len(functions), "count": len(functions),
"capped_at": cap if len(functions) >= cap else None, "capped_at": cap if len(functions) >= cap else None,
"_hint": "Use functions_list() tool for full pagination" if len(functions) >= cap else None, "_hint": f"Use functions_list() tool for full pagination" if len(functions) >= cap else None,
} }
@mcp_resource(uri="ghidra://instance/{port}/function/decompile/address/{address}") @mcp_resource(uri="ghidra://instance/{port}/function/decompile/address/{address}")

View File

@ -4,12 +4,13 @@ Provides tools for discovering, registering, and managing Ghidra instances.
""" """
import time import time
from typing import Any, Dict, Optional from typing import Any, Dict, List, Optional
from fastmcp.contrib.mcp_mixin import mcp_resource, mcp_tool from fastmcp.contrib.mcp_mixin import mcp_tool, mcp_resource
from ..config import get_config
from .base import GhydraMixinBase from .base import GhydraMixinBase
from ..config import get_config
from ..core.http_client import safe_get
class InstancesMixin(GhydraMixinBase): class InstancesMixin(GhydraMixinBase):
@ -166,20 +167,13 @@ class InstancesMixin(GhydraMixinBase):
Returns: Returns:
Confirmation message with instance details Confirmation message with instance details
""" """
# Register lazily without blocking HTTP calls.
# If the instance is unknown, create a stub entry — the first
# actual tool call (functions_list, etc.) will validate the
# connection and fail fast with a clear error if unreachable.
with self._instances_lock: with self._instances_lock:
if port not in self._instances: needs_register = port not in self._instances
config = get_config()
self._instances[port] = { if needs_register:
"url": f"http://{config.ghidra_host}:{port}", result = self.register_instance(port)
"project": "", if "Failed" in result or "Error" in result:
"file": "", return result
"registered_at": time.time(),
"lazy": True,
}
self.set_current_port(port) self.set_current_port(port)
@ -217,25 +211,6 @@ class InstancesMixin(GhydraMixinBase):
return {"port": port, "status": "registered but no details available"} return {"port": port, "status": "registered but no details available"}
@mcp_tool()
def program_info(self, port: Optional[int] = None) -> Dict[str, Any]:
"""Get full program metadata (architecture, language, compiler, image base, memory size).
Args:
port: Ghidra instance port (optional)
Returns:
Program metadata including architecture, language, compiler spec,
image base address, and total memory size
"""
try:
port = self.get_instance_port(port)
except ValueError as e:
return {"success": False, "error": {"code": "NO_INSTANCE", "message": str(e)}}
response = self.safe_get(port, "program")
return self.simplify_response(response)
@mcp_resource(uri="ghidra://instances") @mcp_resource(uri="ghidra://instances")
def resource_instances_list(self) -> Dict[str, Any]: def resource_instances_list(self) -> Dict[str, Any]:
"""MCP Resource: List all active Ghidra instances. """MCP Resource: List all active Ghidra instances.
@ -322,21 +297,3 @@ class InstancesMixin(GhydraMixinBase):
"string_count": string_count, "string_count": string_count,
"port": port, "port": port,
} }
@mcp_resource(uri="ghidra://instance/{port}/program")
def resource_program_info(self, port: Optional[int] = None) -> Dict[str, Any]:
"""MCP Resource: Get program metadata for a Ghidra instance.
Args:
port: Ghidra instance port
Returns:
Program metadata (architecture, language, compiler, image base)
"""
try:
port = self.get_instance_port(port)
except ValueError as e:
return {"error": str(e)}
response = self.safe_get(port, "program")
return self.simplify_response(response)

View File

@ -1,211 +0,0 @@
"""Namespaces mixin for GhydraMCP.
Provides tools for querying namespaces and class definitions.
"""
from typing import Any, Dict, List, Optional
from fastmcp import Context
from fastmcp.contrib.mcp_mixin import mcp_resource, mcp_tool
from ..config import get_config
from .base import GhydraMixinBase
class NamespacesMixin(GhydraMixinBase):
"""Mixin for namespace and class operations.
Provides tools for:
- Listing all non-global namespaces
- Listing class namespaces with qualified names
"""
@mcp_tool()
def namespaces_list(
self,
port: Optional[int] = None,
page_size: int = 50,
grep: Optional[str] = None,
grep_ignorecase: bool = True,
return_all: bool = False,
fields: Optional[List[str]] = None,
ctx: Optional[Context] = None,
) -> Dict[str, Any]:
"""List all non-global namespaces with pagination.
Args:
port: Ghidra instance port (optional)
page_size: Namespaces per page (default: 50, max: 500)
grep: Regex pattern to filter namespace names
grep_ignorecase: Case-insensitive grep (default: True)
return_all: Return all namespaces without pagination
fields: Field names to keep (e.g. ['name', 'id']). Reduces response size.
ctx: FastMCP context (auto-injected)
Returns:
Paginated list of namespaces
"""
try:
port = self.get_instance_port(port)
except ValueError as e:
return {"success": False, "error": {"code": "NO_INSTANCE", "message": str(e)}}
config = get_config()
cap = config.resource_caps.get("namespaces", 500)
response = self.safe_get(port, "namespaces", {"limit": cap})
simplified = self.simplify_response(response)
if not simplified.get("success", True):
return simplified
namespaces = simplified.get("result", [])
if not isinstance(namespaces, list):
namespaces = []
query_params = {"tool": "namespaces_list", "port": port, "grep": grep}
session_id = self._get_session_id(ctx)
return self.filtered_paginate(
data=namespaces,
query_params=query_params,
tool_name="namespaces_list",
session_id=session_id,
page_size=min(page_size, config.max_page_size),
grep=grep,
grep_ignorecase=grep_ignorecase,
return_all=return_all,
fields=fields,
)
@mcp_tool()
def classes_list(
self,
port: Optional[int] = None,
page_size: int = 50,
grep: Optional[str] = None,
grep_ignorecase: bool = True,
return_all: bool = False,
fields: Optional[List[str]] = None,
ctx: Optional[Context] = None,
) -> Dict[str, Any]:
"""List class namespaces with qualified names.
Args:
port: Ghidra instance port (optional)
page_size: Classes per page (default: 50, max: 500)
grep: Regex pattern to filter class names
grep_ignorecase: Case-insensitive grep (default: True)
return_all: Return all classes without pagination
fields: Field names to keep (e.g. ['name', 'qualified_name']). Reduces response size.
ctx: FastMCP context (auto-injected)
Returns:
Paginated list of class namespaces
"""
try:
port = self.get_instance_port(port)
except ValueError as e:
return {"success": False, "error": {"code": "NO_INSTANCE", "message": str(e)}}
config = get_config()
cap = config.resource_caps.get("classes", 500)
response = self.safe_get(port, "classes", {"limit": cap})
simplified = self.simplify_response(response)
if not simplified.get("success", True):
return simplified
classes = simplified.get("result", [])
if not isinstance(classes, list):
classes = []
query_params = {"tool": "classes_list", "port": port, "grep": grep}
session_id = self._get_session_id(ctx)
return self.filtered_paginate(
data=classes,
query_params=query_params,
tool_name="classes_list",
session_id=session_id,
page_size=min(page_size, config.max_page_size),
grep=grep,
grep_ignorecase=grep_ignorecase,
return_all=return_all,
fields=fields,
)
# Resources
@mcp_resource(uri="ghidra://instance/{port}/namespaces")
def resource_namespaces_list(self, port: Optional[int] = None) -> Dict[str, Any]:
"""MCP Resource: List namespaces (capped).
Args:
port: Ghidra instance port
Returns:
List of namespaces (capped)
"""
try:
port = self.get_instance_port(port)
except ValueError as e:
return {"error": str(e)}
config = get_config()
cap = config.resource_caps.get("namespaces", 500)
response = self.safe_get(port, "namespaces", {"limit": cap})
simplified = self.simplify_response(response)
if not simplified.get("success", True):
return simplified
namespaces = simplified.get("result", [])
if not isinstance(namespaces, list):
namespaces = []
return {
"namespaces": namespaces[:cap],
"count": len(namespaces),
"capped_at": cap if len(namespaces) >= cap else None,
"_hint": "Use namespaces_list() tool for full pagination"
if len(namespaces) >= cap
else None,
}
@mcp_resource(uri="ghidra://instance/{port}/classes")
def resource_classes_list(self, port: Optional[int] = None) -> Dict[str, Any]:
"""MCP Resource: List classes (capped).
Args:
port: Ghidra instance port
Returns:
List of class namespaces (capped)
"""
try:
port = self.get_instance_port(port)
except ValueError as e:
return {"error": str(e)}
config = get_config()
cap = config.resource_caps.get("classes", 500)
response = self.safe_get(port, "classes", {"limit": cap})
simplified = self.simplify_response(response)
if not simplified.get("success", True):
return simplified
classes = simplified.get("result", [])
if not isinstance(classes, list):
classes = []
return {
"classes": classes[:cap],
"count": len(classes),
"capped_at": cap if len(classes) >= cap else None,
"_hint": "Use classes_list() tool for full pagination"
if len(classes) >= cap
else None,
}

View File

@ -1,122 +0,0 @@
"""Segments mixin for GhydraMCP.
Provides tools for querying memory segments (sections) and their permissions.
"""
from typing import Any, Dict, List, Optional
from fastmcp import Context
from fastmcp.contrib.mcp_mixin import mcp_resource, mcp_tool
from ..config import get_config
from .base import GhydraMixinBase
class SegmentsMixin(GhydraMixinBase):
"""Mixin for memory segment operations.
Provides tools for:
- Listing memory segments with permissions and size info
"""
@mcp_tool()
def segments_list(
self,
name: Optional[str] = None,
port: Optional[int] = None,
page_size: int = 50,
grep: Optional[str] = None,
grep_ignorecase: bool = True,
return_all: bool = False,
fields: Optional[List[str]] = None,
ctx: Optional[Context] = None,
) -> Dict[str, Any]:
"""List memory segments with R/W/X permissions and size info.
Args:
name: Filter by segment name (server-side, exact match)
port: Ghidra instance port (optional)
page_size: Segments per page (default: 50, max: 500)
grep: Regex pattern to filter segment names
grep_ignorecase: Case-insensitive grep (default: True)
return_all: Return all segments without pagination
fields: Field names to keep (e.g. ['name', 'start', 'permissions']). Reduces response size.
ctx: FastMCP context (auto-injected)
Returns:
Paginated list of memory segments
"""
try:
port = self.get_instance_port(port)
except ValueError as e:
return {"success": False, "error": {"code": "NO_INSTANCE", "message": str(e)}}
config = get_config()
cap = config.resource_caps.get("segments", 500)
params = {"limit": cap}
if name:
params["name"] = name
response = self.safe_get(port, "segments", params)
simplified = self.simplify_response(response)
if not simplified.get("success", True):
return simplified
segments = simplified.get("result", [])
if not isinstance(segments, list):
segments = []
query_params = {"tool": "segments_list", "port": port, "name": name, "grep": grep}
session_id = self._get_session_id(ctx)
return self.filtered_paginate(
data=segments,
query_params=query_params,
tool_name="segments_list",
session_id=session_id,
page_size=min(page_size, config.max_page_size),
grep=grep,
grep_ignorecase=grep_ignorecase,
return_all=return_all,
fields=fields,
)
# Resources
@mcp_resource(uri="ghidra://instance/{port}/segments")
def resource_segments_list(self, port: Optional[int] = None) -> Dict[str, Any]:
"""MCP Resource: List memory segments (capped).
Args:
port: Ghidra instance port
Returns:
List of memory segments (capped)
"""
try:
port = self.get_instance_port(port)
except ValueError as e:
return {"error": str(e)}
config = get_config()
cap = config.resource_caps.get("segments", 500)
response = self.safe_get(port, "segments", {"limit": cap})
simplified = self.simplify_response(response)
if not simplified.get("success", True):
return simplified
segments = simplified.get("result", [])
if not isinstance(segments, list):
segments = []
return {
"segments": segments[:cap],
"count": len(segments),
"capped_at": cap if len(segments) >= cap else None,
"_hint": "Use segments_list() tool for full pagination"
if len(segments) >= cap
else None,
}

View File

@ -6,10 +6,10 @@ Provides tools for struct data type operations.
from typing import Any, Dict, List, Optional from typing import Any, Dict, List, Optional
from fastmcp import Context from fastmcp import Context
from fastmcp.contrib.mcp_mixin import mcp_resource, mcp_tool from fastmcp.contrib.mcp_mixin import mcp_tool, mcp_resource
from ..config import get_config
from .base import GhydraMixinBase from .base import GhydraMixinBase
from ..config import get_config
class StructsMixin(GhydraMixinBase): class StructsMixin(GhydraMixinBase):

View File

@ -1,422 +0,0 @@
"""Symbols mixin for GhydraMCP.
Provides tools for symbol table operations including labels, imports, and exports.
"""
from typing import Any, Dict, List, Optional
from fastmcp import Context
from fastmcp.contrib.mcp_mixin import mcp_resource, mcp_tool
from ..config import get_config
from .base import GhydraMixinBase
class SymbolsMixin(GhydraMixinBase):
"""Mixin for symbol table operations.
Provides tools for:
- Listing all symbols with pagination
- Querying imported symbols (external references)
- Querying exported symbols (entry points)
"""
@mcp_tool()
def symbols_list(
self,
port: Optional[int] = None,
page_size: int = 50,
grep: Optional[str] = None,
grep_ignorecase: bool = True,
return_all: bool = False,
fields: Optional[List[str]] = None,
ctx: Optional[Context] = None,
) -> Dict[str, Any]:
"""List symbols with cursor-based pagination.
Args:
port: Ghidra instance port (optional)
page_size: Symbols per page (default: 50, max: 500)
grep: Regex pattern to filter symbol names
grep_ignorecase: Case-insensitive grep (default: True)
return_all: Return all symbols without pagination
fields: Field names to keep (e.g. ['name', 'address']). Reduces response size.
ctx: FastMCP context (auto-injected)
Returns:
Paginated list of symbols
"""
try:
port = self.get_instance_port(port)
except ValueError as e:
return {"success": False, "error": {"code": "NO_INSTANCE", "message": str(e)}}
config = get_config()
cap = config.resource_caps.get("symbols", 1000)
response = self.safe_get(port, "symbols", {"limit": cap})
simplified = self.simplify_response(response)
if not simplified.get("success", True):
return simplified
symbols = simplified.get("result", [])
if not isinstance(symbols, list):
symbols = []
query_params = {"tool": "symbols_list", "port": port, "grep": grep}
session_id = self._get_session_id(ctx)
return self.filtered_paginate(
data=symbols,
query_params=query_params,
tool_name="symbols_list",
session_id=session_id,
page_size=min(page_size, config.max_page_size),
grep=grep,
grep_ignorecase=grep_ignorecase,
return_all=return_all,
fields=fields,
)
@mcp_tool()
def symbols_imports(
self,
port: Optional[int] = None,
page_size: int = 50,
grep: Optional[str] = None,
grep_ignorecase: bool = True,
return_all: bool = False,
fields: Optional[List[str]] = None,
ctx: Optional[Context] = None,
) -> Dict[str, Any]:
"""List imported symbols (external references) with pagination.
Args:
port: Ghidra instance port (optional)
page_size: Imports per page (default: 50, max: 500)
grep: Regex pattern to filter import names
grep_ignorecase: Case-insensitive grep (default: True)
return_all: Return all imports without pagination
fields: Field names to keep. Reduces response size.
ctx: FastMCP context (auto-injected)
Returns:
Paginated list of imported symbols
"""
try:
port = self.get_instance_port(port)
except ValueError as e:
return {"success": False, "error": {"code": "NO_INSTANCE", "message": str(e)}}
config = get_config()
cap = config.resource_caps.get("symbols", 1000)
response = self.safe_get(port, "symbols/imports", {"limit": cap})
simplified = self.simplify_response(response)
if not simplified.get("success", True):
return simplified
imports = simplified.get("result", [])
if not isinstance(imports, list):
imports = []
query_params = {"tool": "symbols_imports", "port": port, "grep": grep}
session_id = self._get_session_id(ctx)
return self.filtered_paginate(
data=imports,
query_params=query_params,
tool_name="symbols_imports",
session_id=session_id,
page_size=min(page_size, config.max_page_size),
grep=grep,
grep_ignorecase=grep_ignorecase,
return_all=return_all,
fields=fields,
)
@mcp_tool()
def symbols_exports(
self,
port: Optional[int] = None,
page_size: int = 50,
grep: Optional[str] = None,
grep_ignorecase: bool = True,
return_all: bool = False,
fields: Optional[List[str]] = None,
ctx: Optional[Context] = None,
) -> Dict[str, Any]:
"""List exported symbols (entry points) with pagination.
Args:
port: Ghidra instance port (optional)
page_size: Exports per page (default: 50, max: 500)
grep: Regex pattern to filter export names
grep_ignorecase: Case-insensitive grep (default: True)
return_all: Return all exports without pagination
fields: Field names to keep. Reduces response size.
ctx: FastMCP context (auto-injected)
Returns:
Paginated list of exported symbols
"""
try:
port = self.get_instance_port(port)
except ValueError as e:
return {"success": False, "error": {"code": "NO_INSTANCE", "message": str(e)}}
config = get_config()
cap = config.resource_caps.get("symbols", 1000)
response = self.safe_get(port, "symbols/exports", {"limit": cap})
simplified = self.simplify_response(response)
if not simplified.get("success", True):
return simplified
exports = simplified.get("result", [])
if not isinstance(exports, list):
exports = []
query_params = {"tool": "symbols_exports", "port": port, "grep": grep}
session_id = self._get_session_id(ctx)
return self.filtered_paginate(
data=exports,
query_params=query_params,
tool_name="symbols_exports",
session_id=session_id,
page_size=min(page_size, config.max_page_size),
grep=grep,
grep_ignorecase=grep_ignorecase,
return_all=return_all,
fields=fields,
)
@mcp_tool()
def symbols_create(
self,
name: str,
address: str,
port: Optional[int] = None,
) -> Dict[str, Any]:
"""Create a new label/symbol at the specified address.
Args:
name: Name for the new symbol
address: Memory address in hex format
port: Ghidra instance port (optional)
Returns:
Created symbol information
"""
if not name:
return {
"success": False,
"error": {
"code": "MISSING_PARAMETER",
"message": "name parameter is required",
},
}
if not address:
return {
"success": False,
"error": {
"code": "MISSING_PARAMETER",
"message": "address parameter is required",
},
}
try:
port = self.get_instance_port(port)
except ValueError as e:
return {"success": False, "error": {"code": "NO_INSTANCE", "message": str(e)}}
payload = {"name": name, "address": address}
response = self.safe_post(port, "symbols", payload)
return self.simplify_response(response)
@mcp_tool()
def symbols_rename(
self,
address: str,
new_name: str,
port: Optional[int] = None,
) -> Dict[str, Any]:
"""Rename the primary symbol at the specified address.
Args:
address: Memory address of the symbol in hex format
new_name: New name for the symbol
port: Ghidra instance port (optional)
Returns:
Operation result with old and new names
"""
if not address:
return {
"success": False,
"error": {
"code": "MISSING_PARAMETER",
"message": "address parameter is required",
},
}
if not new_name:
return {
"success": False,
"error": {
"code": "MISSING_PARAMETER",
"message": "new_name parameter is required",
},
}
try:
port = self.get_instance_port(port)
except ValueError as e:
return {"success": False, "error": {"code": "NO_INSTANCE", "message": str(e)}}
payload = {"name": new_name}
response = self.safe_patch(port, f"symbols/{address}", payload)
return self.simplify_response(response)
@mcp_tool()
def symbols_delete(
self,
address: str,
port: Optional[int] = None,
) -> Dict[str, Any]:
"""Delete the primary symbol at the specified address.
Args:
address: Memory address of the symbol in hex format
port: Ghidra instance port (optional)
Returns:
Operation result
"""
if not address:
return {
"success": False,
"error": {
"code": "MISSING_PARAMETER",
"message": "address parameter is required",
},
}
try:
port = self.get_instance_port(port)
except ValueError as e:
return {"success": False, "error": {"code": "NO_INSTANCE", "message": str(e)}}
response = self.safe_delete(port, f"symbols/{address}")
return self.simplify_response(response)
# Resources
@mcp_resource(uri="ghidra://instance/{port}/symbols")
def resource_symbols_list(self, port: Optional[int] = None) -> Dict[str, Any]:
"""MCP Resource: List symbols (capped).
Args:
port: Ghidra instance port
Returns:
List of symbols (capped)
"""
try:
port = self.get_instance_port(port)
except ValueError as e:
return {"error": str(e)}
config = get_config()
cap = config.resource_caps.get("symbols", 1000)
response = self.safe_get(port, "symbols", {"limit": cap})
simplified = self.simplify_response(response)
if not simplified.get("success", True):
return simplified
symbols = simplified.get("result", [])
if not isinstance(symbols, list):
symbols = []
return {
"symbols": symbols[:cap],
"count": len(symbols),
"capped_at": cap if len(symbols) >= cap else None,
"_hint": "Use symbols_list() tool for full pagination" if len(symbols) >= cap else None,
}
@mcp_resource(uri="ghidra://instance/{port}/symbols/imports")
def resource_symbols_imports(self, port: Optional[int] = None) -> Dict[str, Any]:
"""MCP Resource: List imported symbols (capped).
Args:
port: Ghidra instance port
Returns:
List of imported symbols (capped)
"""
try:
port = self.get_instance_port(port)
except ValueError as e:
return {"error": str(e)}
config = get_config()
cap = config.resource_caps.get("symbols", 1000)
response = self.safe_get(port, "symbols/imports", {"limit": cap})
simplified = self.simplify_response(response)
if not simplified.get("success", True):
return simplified
imports = simplified.get("result", [])
if not isinstance(imports, list):
imports = []
return {
"imports": imports[:cap],
"count": len(imports),
"capped_at": cap if len(imports) >= cap else None,
"_hint": "Use symbols_imports() tool for full pagination"
if len(imports) >= cap
else None,
}
@mcp_resource(uri="ghidra://instance/{port}/symbols/exports")
def resource_symbols_exports(self, port: Optional[int] = None) -> Dict[str, Any]:
"""MCP Resource: List exported symbols (capped).
Args:
port: Ghidra instance port
Returns:
List of exported symbols (capped)
"""
try:
port = self.get_instance_port(port)
except ValueError as e:
return {"error": str(e)}
config = get_config()
cap = config.resource_caps.get("symbols", 1000)
response = self.safe_get(port, "symbols/exports", {"limit": cap})
simplified = self.simplify_response(response)
if not simplified.get("success", True):
return simplified
exports = simplified.get("result", [])
if not isinstance(exports, list):
exports = []
return {
"exports": exports[:cap],
"count": len(exports),
"capped_at": cap if len(exports) >= cap else None,
"_hint": "Use symbols_exports() tool for full pagination"
if len(exports) >= cap
else None,
}

View File

@ -1,261 +0,0 @@
"""Variables mixin for GhydraMCP.
Provides tools for querying global and function-local variables.
"""
from typing import Any, Dict, List, Optional
from fastmcp import Context
from fastmcp.contrib.mcp_mixin import mcp_resource, mcp_tool
from ..config import get_config
from .base import GhydraMixinBase
class VariablesMixin(GhydraMixinBase):
"""Mixin for variable operations.
Provides tools for:
- Listing global and function variables
- Querying local variables and parameters for a specific function
"""
@mcp_tool()
def variables_list(
self,
global_only: bool = False,
port: Optional[int] = None,
page_size: int = 50,
grep: Optional[str] = None,
grep_ignorecase: bool = True,
return_all: bool = False,
fields: Optional[List[str]] = None,
ctx: Optional[Context] = None,
) -> Dict[str, Any]:
"""List variables with cursor-based pagination.
Args:
global_only: Only return global variables (default: False)
port: Ghidra instance port (optional)
page_size: Variables per page (default: 50, max: 500)
grep: Regex pattern to filter variable names
grep_ignorecase: Case-insensitive grep (default: True)
return_all: Return all variables without pagination
fields: Field names to keep (e.g. ['name', 'type', 'address']). Reduces response size.
ctx: FastMCP context (auto-injected)
Returns:
Paginated list of variables
"""
try:
port = self.get_instance_port(port)
except ValueError as e:
return {"success": False, "error": {"code": "NO_INSTANCE", "message": str(e)}}
config = get_config()
cap = config.resource_caps.get("variables", 1000)
params = {"limit": cap}
if global_only:
params["global_only"] = "true"
response = self.safe_get(port, "variables", params)
simplified = self.simplify_response(response)
if not simplified.get("success", True):
return simplified
variables = simplified.get("result", [])
if not isinstance(variables, list):
variables = []
query_params = {
"tool": "variables_list",
"port": port,
"global_only": global_only,
"grep": grep,
}
session_id = self._get_session_id(ctx)
return self.filtered_paginate(
data=variables,
query_params=query_params,
tool_name="variables_list",
session_id=session_id,
page_size=min(page_size, config.max_page_size),
grep=grep,
grep_ignorecase=grep_ignorecase,
return_all=return_all,
fields=fields,
)
@mcp_tool()
def functions_variables(
self,
address: str,
port: Optional[int] = None,
page_size: int = 50,
grep: Optional[str] = None,
grep_ignorecase: bool = True,
return_all: bool = False,
fields: Optional[List[str]] = None,
ctx: Optional[Context] = None,
) -> Dict[str, Any]:
"""List local variables and parameters for a specific function.
Args:
address: Function address in hex format
port: Ghidra instance port (optional)
page_size: Variables per page (default: 50, max: 500)
grep: Regex pattern to filter variable names
grep_ignorecase: Case-insensitive grep (default: True)
return_all: Return all variables without pagination
fields: Field names to keep (e.g. ['name', 'type', 'storage']). Reduces response size.
ctx: FastMCP context (auto-injected)
Returns:
Paginated list of function variables
"""
if not address:
return {
"success": False,
"error": {
"code": "MISSING_PARAMETER",
"message": "Address parameter is required",
},
}
try:
port = self.get_instance_port(port)
except ValueError as e:
return {"success": False, "error": {"code": "NO_INSTANCE", "message": str(e)}}
config = get_config()
response = self.safe_get(port, f"functions/{address}/variables")
simplified = self.simplify_response(response)
if not simplified.get("success", True):
return simplified
variables = simplified.get("result", [])
if not isinstance(variables, list):
variables = []
query_params = {
"tool": "functions_variables",
"port": port,
"address": address,
"grep": grep,
}
session_id = self._get_session_id(ctx)
return self.filtered_paginate(
data=variables,
query_params=query_params,
tool_name="functions_variables",
session_id=session_id,
page_size=min(page_size, config.max_page_size),
grep=grep,
grep_ignorecase=grep_ignorecase,
return_all=return_all,
fields=fields,
)
@mcp_tool()
def variables_rename(
self,
function_address: str,
variable_name: str,
new_name: str,
new_type: Optional[str] = None,
port: Optional[int] = None,
) -> Dict[str, Any]:
"""Rename a variable (and optionally retype) within a function.
Args:
function_address: Function address in hex format
variable_name: Current name of the variable
new_name: New name for the variable
new_type: New data type (optional, e.g. "int", "char*")
port: Ghidra instance port (optional)
Returns:
Operation result
"""
if not function_address:
return {
"success": False,
"error": {
"code": "MISSING_PARAMETER",
"message": "function_address parameter is required",
},
}
if not variable_name:
return {
"success": False,
"error": {
"code": "MISSING_PARAMETER",
"message": "variable_name parameter is required",
},
}
if not new_name:
return {
"success": False,
"error": {
"code": "MISSING_PARAMETER",
"message": "new_name parameter is required",
},
}
try:
port = self.get_instance_port(port)
except ValueError as e:
return {"success": False, "error": {"code": "NO_INSTANCE", "message": str(e)}}
from urllib.parse import quote
payload: dict = {"name": new_name}
if new_type:
payload["data_type"] = new_type
endpoint = f"functions/{function_address}/variables/{quote(variable_name)}"
response = self.safe_patch(port, endpoint, payload)
return self.simplify_response(response)
# Resources
@mcp_resource(uri="ghidra://instance/{port}/variables")
def resource_variables_list(self, port: Optional[int] = None) -> Dict[str, Any]:
"""MCP Resource: List variables (capped).
Args:
port: Ghidra instance port
Returns:
List of variables (capped)
"""
try:
port = self.get_instance_port(port)
except ValueError as e:
return {"error": str(e)}
config = get_config()
cap = config.resource_caps.get("variables", 1000)
response = self.safe_get(port, "variables", {"limit": cap})
simplified = self.simplify_response(response)
if not simplified.get("success", True):
return simplified
variables = simplified.get("result", [])
if not isinstance(variables, list):
variables = []
return {
"variables": variables[:cap],
"count": len(variables),
"capped_at": cap if len(variables) >= cap else None,
"_hint": "Use variables_list() tool for full pagination"
if len(variables) >= cap
else None,
}

View File

@ -6,10 +6,10 @@ Provides tools for cross-reference (xref) operations.
from typing import Any, Dict, List, Optional from typing import Any, Dict, List, Optional
from fastmcp import Context from fastmcp import Context
from fastmcp.contrib.mcp_mixin import mcp_resource, mcp_tool from fastmcp.contrib.mcp_mixin import mcp_tool, mcp_resource
from ..config import get_config
from .base import GhydraMixinBase from .base import GhydraMixinBase
from ..config import get_config
class XrefsMixin(GhydraMixinBase): class XrefsMixin(GhydraMixinBase):

View File

@ -13,23 +13,17 @@ from typing import Optional
from fastmcp import FastMCP from fastmcp import FastMCP
from .config import GhydraConfig, get_config, set_config from .config import get_config, set_config, GhydraConfig
from .mixins import ( from .mixins import (
AnalysisMixin,
BookmarksMixin,
CursorsMixin,
DataMixin,
DataTypesMixin,
DockerMixin,
FunctionsMixin,
InstancesMixin, InstancesMixin,
MemoryMixin, FunctionsMixin,
NamespacesMixin, DataMixin,
SegmentsMixin,
StructsMixin, StructsMixin,
SymbolsMixin, AnalysisMixin,
VariablesMixin, MemoryMixin,
XrefsMixin, XrefsMixin,
CursorsMixin,
DockerMixin,
) )
@ -62,12 +56,6 @@ def create_server(
xrefs_mixin = XrefsMixin() xrefs_mixin = XrefsMixin()
cursors_mixin = CursorsMixin() cursors_mixin = CursorsMixin()
docker_mixin = DockerMixin() docker_mixin = DockerMixin()
symbols_mixin = SymbolsMixin()
segments_mixin = SegmentsMixin()
variables_mixin = VariablesMixin()
namespaces_mixin = NamespacesMixin()
bookmarks_mixin = BookmarksMixin()
datatypes_mixin = DataTypesMixin()
# Register all mixins with the server # Register all mixins with the server
# Each mixin registers its tools, resources, and prompts # Each mixin registers its tools, resources, and prompts
@ -80,12 +68,6 @@ def create_server(
xrefs_mixin.register_all(mcp) xrefs_mixin.register_all(mcp)
cursors_mixin.register_all(mcp) cursors_mixin.register_all(mcp)
docker_mixin.register_all(mcp) docker_mixin.register_all(mcp)
symbols_mixin.register_all(mcp)
segments_mixin.register_all(mcp)
variables_mixin.register_all(mcp)
namespaces_mixin.register_all(mcp)
bookmarks_mixin.register_all(mcp)
datatypes_mixin.register_all(mcp)
# Optional feedback collection # Optional feedback collection
cfg = get_config() cfg = get_config()
@ -105,37 +87,26 @@ def create_server(
def _periodic_discovery(interval: int = 30): def _periodic_discovery(interval: int = 30):
"""Background thread for periodic instance discovery. """Background thread for periodic instance discovery.
Uses a short timeout per port so a full scan completes quickly
even when most ports are unreachable.
Args: Args:
interval: Seconds between discovery attempts interval: Seconds between discovery attempts
""" """
import requests as _requests
from .mixins.base import GhydraMixinBase from .mixins.base import GhydraMixinBase
from .core.http_client import safe_get
config = get_config() config = get_config()
while True: while True:
time.sleep(interval) time.sleep(interval)
try: try:
# Quick scan — use discovery_timeout (0.5s), NOT request_timeout (30s) # Quick scan of common ports
for port in config.quick_discovery_range: for port in config.quick_discovery_range:
try: try:
url = f"http://{config.ghidra_host}:{port}/" response = safe_get(port, "")
resp = _requests.get(
url,
timeout=config.discovery_timeout,
headers={"Accept": "application/json"},
)
if resp.ok:
response = resp.json()
if response.get("success", False): if response.get("success", False):
with GhydraMixinBase._instances_lock: with GhydraMixinBase._instances_lock:
if port not in GhydraMixinBase._instances: if port not in GhydraMixinBase._instances:
GhydraMixinBase._instances[port] = { GhydraMixinBase._instances[port] = {
"url": url.rstrip("/"), "url": f"http://{config.ghidra_host}:{port}",
"project": response.get("project", ""), "project": response.get("project", ""),
"file": response.get("file", ""), "file": response.get("file", ""),
"discovered_at": time.time(), "discovered_at": time.time(),
@ -183,8 +154,8 @@ def main():
# Initial instance discovery # Initial instance discovery
print(f" Discovering Ghidra instances on {config.ghidra_host}...", file=sys.stderr) print(f" Discovering Ghidra instances on {config.ghidra_host}...", file=sys.stderr)
from .core.http_client import safe_get
from .mixins.base import GhydraMixinBase from .mixins.base import GhydraMixinBase
from .core.http_client import safe_get
found = 0 found = 0
for port in config.quick_discovery_range: for port in config.quick_discovery_range: