diff --git a/binaries/.gitkeep b/binaries/.gitkeep new file mode 100644 index 0000000..357e755 --- /dev/null +++ b/binaries/.gitkeep @@ -0,0 +1 @@ +# Add binaries here for analysis diff --git a/docker/GhydraMCPServer.py b/docker/GhydraMCPServer.py index d44a800..85e4456 100644 --- a/docker/GhydraMCPServer.py +++ b/docker/GhydraMCPServer.py @@ -305,6 +305,7 @@ ROUTES = [ ("GET", r"^/strings$", "handle_strings"), # Memory + ("GET", r"^/memory/([^/]+)/comments/([^/]+)$", "handle_get_comment"), ("POST", r"^/memory/([^/]+)/comments/([^/]+)$", "handle_set_comment"), ("GET", r"^/memory/blocks$", "handle_memory_blocks"), ("GET", r"^/memory$", "handle_memory_read"), @@ -316,8 +317,26 @@ ROUTES = [ # Symbols ("GET", r"^/symbols/imports$", "handle_imports"), ("GET", r"^/symbols/exports$", "handle_exports"), + ("POST", r"^/symbols$", "handle_symbol_create"), + ("PATCH", r"^/symbols/([^/]+)$", "handle_symbol_rename"), + ("DELETE", r"^/symbols/([^/]+)$", "handle_symbol_delete"), ("GET", r"^/symbols$", "handle_symbols"), + # Variables + ("PATCH", r"^/functions/([^/]+)/variables/([^/]+)$", "handle_variable_rename"), + ("GET", r"^/variables$", "handle_variables"), + + # Bookmarks + ("POST", r"^/bookmarks$", "handle_bookmark_create"), + ("DELETE", r"^/bookmarks/([^/]+)$", "handle_bookmark_delete"), + ("GET", r"^/bookmarks$", "handle_bookmarks"), + + # Data types + ("POST", r"^/datatypes/enums$", "handle_enum_create"), + ("GET", r"^/datatypes/enums$", "handle_enums"), + ("POST", r"^/datatypes/typedefs$", "handle_typedef_create"), + ("GET", r"^/datatypes/typedefs$", "handle_typedefs"), + # Cross-references ("GET", r"^/xrefs$", "handle_xrefs"), @@ -734,26 +753,62 @@ class GhydraMCPHandler(HttpHandler): limit = parse_int(params.get("limit"), 100) offset = parse_int(params.get("offset"), 0) name_filter = params.get("name") + name_contains = params.get("name_contains") + name_regex = params.get("name_matches_regex") + addr_filter = params.get("addr") grep_pattern = compile_grep(params) - functions = [] + # Compile name regex if provided + name_regex_pat = None + if name_regex: + try: + name_regex_pat = re.compile(name_regex, re.IGNORECASE) + except: + return {"success": False, "error": {"code": "INVALID_REGEX", "message": "Invalid regex: %s" % name_regex}} + fm = self.program.getFunctionManager() total = fm.getFunctionCount() + + # Single function lookup by address + if addr_filter: + func = self._find_function_at(addr_filter) + if not func: + return {"success": True, "result": [], "size": 0, "offset": 0, "limit": limit} + addr = str(func.getEntryPoint()) + return {"success": True, "result": [{ + "name": func.getName(), + "address": addr, + "signature": str(func.getSignature()), + "parameterCount": func.getParameterCount(), + "isThunk": func.isThunk(), + "_links": { + "self": make_link("/functions/%s" % addr), + "decompile": make_link("/functions/%s/decompile" % addr), + "disassembly": make_link("/functions/%s/disassembly" % addr), + }, + }], "size": 1, "offset": 0, "limit": limit} + + functions = [] count = 0 skipped = 0 for func in fm.getFunctions(True): if count >= limit: break - # Apply name filter - if name_filter and name_filter.lower() not in func.getName().lower(): + func_name = func.getName() + # Apply name filters + if name_filter and name_filter.lower() not in func_name.lower(): + continue + if name_contains and name_contains.lower() not in func_name.lower(): + continue + if name_regex_pat and not name_regex_pat.search(func_name): continue if skipped < offset: skipped += 1 continue addr = str(func.getEntryPoint()) item = { - "name": func.getName(), + "name": func_name, "address": addr, "signature": str(func.getSignature()), "parameterCount": func.getParameterCount(), @@ -1358,6 +1413,36 @@ class GhydraMCPHandler(HttpHandler): """List memory blocks (alias for /segments).""" return self.handle_segments(exchange) + def handle_get_comment(self, exchange, addr_str, comment_type): + """Get a comment at a specific address.""" + if not self.program: + return self._no_program() + + ct = COMMENT_TYPE_MAP.get(comment_type.lower()) + if ct is None: + return {"success": False, "error": { + "code": "INVALID_COMMENT_TYPE", + "message": "Invalid comment type: %s. Use: pre, post, eol, plate, repeatable" % comment_type + }} + + try: + addr = self.program.getAddressFactory().getAddress(addr_str) + listing = self.program.getListing() + cu = listing.getCodeUnitAt(addr) + if not cu: + cu = listing.getCodeUnitContaining(addr) + if not cu: + return {"success": True, "result": {"address": addr_str, "commentType": comment_type, "comment": None}} + + comment = cu.getComment(ct) + return {"success": True, "result": { + "address": addr_str, + "commentType": comment_type, + "comment": comment, + }} + except Exception as e: + return {"success": False, "error": {"code": "COMMENT_ERROR", "message": str(e)}} + def handle_set_comment(self, exchange, addr_str, comment_type): """Set a comment at a specific address.""" if not self.program: @@ -2112,6 +2197,518 @@ class GhydraMCPHandler(HttpHandler): except Exception as e: return {"success": False, "error": {"code": "DATAFLOW_ERROR", "message": str(e)}} + # ================================================================== + # Symbol CRUD Handlers + # ================================================================== + + def handle_symbol_create(self, exchange): + """POST /symbols - Create a new label/symbol.""" + if not self.program: + return self._no_program() + body = parse_json_body(exchange) + name = body.get("name", "") + address = body.get("address", "") + if not name or not address: + return {"success": False, "error": {"code": "MISSING_PARAMETER", "message": "Both 'name' and 'address' are required"}} + + try: + addr = self.program.getAddressFactory().getAddress(address) + st = self.program.getSymbolTable() + + def do_create(): + st.createLabel(addr, name, SourceType.USER_DEFINED) + + with_transaction(self.program, "Create symbol", do_create) + return {"success": True, "result": { + "name": name, + "address": address, + "message": "Symbol created successfully", + }} + except Exception as e: + return {"success": False, "error": {"code": "SYMBOL_ERROR", "message": str(e)}} + + def handle_symbol_rename(self, exchange, addr_str): + """PATCH /symbols/{address} - Rename primary symbol at address.""" + if not self.program: + return self._no_program() + body = parse_json_body(exchange) + new_name = body.get("name", "") + if not new_name: + return {"success": False, "error": {"code": "MISSING_PARAMETER", "message": "'name' parameter is required"}} + + try: + addr = self.program.getAddressFactory().getAddress(addr_str) + st = self.program.getSymbolTable() + symbol = st.getPrimarySymbol(addr) + if not symbol: + return {"success": False, "error": {"code": "NOT_FOUND", "message": "No symbol at address: %s" % addr_str}} + + old_name = symbol.getName() + + def do_rename(): + symbol.setName(new_name, SourceType.USER_DEFINED) + + with_transaction(self.program, "Rename symbol", do_rename) + return {"success": True, "result": { + "address": addr_str, + "oldName": old_name, + "newName": new_name, + "message": "Symbol renamed successfully", + }} + except Exception as e: + return {"success": False, "error": {"code": "SYMBOL_ERROR", "message": str(e)}} + + def handle_symbol_delete(self, exchange, addr_str): + """DELETE /symbols/{address} - Delete primary symbol at address.""" + if not self.program: + return self._no_program() + + try: + addr = self.program.getAddressFactory().getAddress(addr_str) + st = self.program.getSymbolTable() + symbol = st.getPrimarySymbol(addr) + if not symbol: + return {"success": False, "error": {"code": "NOT_FOUND", "message": "No symbol at address: %s" % addr_str}} + + name = symbol.getName() + + def do_delete(): + symbol.delete() + + with_transaction(self.program, "Delete symbol", do_delete) + return {"success": True, "result": { + "address": addr_str, + "name": name, + "message": "Symbol deleted successfully", + }} + except Exception as e: + return {"success": False, "error": {"code": "SYMBOL_ERROR", "message": str(e)}} + + # ================================================================== + # Variable Rename Handler + # ================================================================== + + def handle_variable_rename(self, exchange, addr_str, var_name): + """PATCH /functions/{address}/variables/{name} - Rename/retype a variable.""" + if not self.program: + return self._no_program() + body = parse_json_body(exchange) + new_name = body.get("name", "") + new_type = body.get("data_type") + + if not new_name: + return {"success": False, "error": {"code": "MISSING_PARAMETER", "message": "'name' parameter is required"}} + + try: + # URL-decode the variable name + from java.net import URLDecoder + decoded_name = URLDecoder.decode(var_name, "UTF-8") + + func = self._find_function_at(addr_str) + if not func: + return {"success": False, "error": {"code": "FUNCTION_NOT_FOUND", "message": "No function at address: %s" % addr_str}} + + # Search parameters and local variables + target_var = None + for param in func.getParameters(): + if param.getName() == decoded_name: + target_var = param + break + if not target_var: + for var in func.getAllVariables(): + if var.getName() == decoded_name: + target_var = var + break + + if not target_var: + return {"success": False, "error": {"code": "NOT_FOUND", "message": "Variable '%s' not found in function" % decoded_name}} + + old_name = target_var.getName() + + def do_rename(): + target_var.setName(new_name, SourceType.USER_DEFINED) + if new_type: + dtm = self.program.getDataTypeManager() + dt = resolve_data_type(dtm, new_type) + if dt: + target_var.setDataType(dt, SourceType.USER_DEFINED) + + with_transaction(self.program, "Rename variable", do_rename) + return {"success": True, "result": { + "function": addr_str, + "oldName": old_name, + "newName": new_name, + "message": "Variable renamed successfully", + }} + except Exception as e: + return {"success": False, "error": {"code": "VARIABLE_ERROR", "message": str(e)}} + + # ================================================================== + # Variables Handler + # ================================================================== + + def handle_variables(self, exchange): + """GET /variables - List global and function variables.""" + if not self.program: + return self._no_program() + params = parse_query_params(exchange) + limit = parse_int(params.get("limit"), 100) + offset = parse_int(params.get("offset"), 0) + global_only = params.get("global_only", "false").lower() == "true" + search = params.get("search", "") + grep_pattern = compile_grep(params) + + variables = [] + count = 0 + skipped = 0 + + # Function variables (parameters + locals) + if not global_only: + fm = self.program.getFunctionManager() + for func in fm.getFunctions(True): + if count >= limit: + break + func_name = func.getName() + func_addr = str(func.getEntryPoint()) + + for param in func.getParameters(): + if count >= limit: + break + var_name = param.getName() + if search and search.lower() not in var_name.lower(): + continue + if skipped < offset: + skipped += 1 + continue + item = { + "name": var_name, + "type": str(param.getDataType()), + "storage": str(param.getVariableStorage()), + "scope": "parameter", + "function": func_name, + "functionAddress": func_addr, + } + if not grep_matches_item(item, grep_pattern): + continue + variables.append(item) + count += 1 + + for var in func.getLocalVariables(): + if count >= limit: + break + var_name = var.getName() + if search and search.lower() not in var_name.lower(): + continue + if skipped < offset: + skipped += 1 + continue + item = { + "name": var_name, + "type": str(var.getDataType()), + "storage": str(var.getVariableStorage()), + "scope": "local", + "function": func_name, + "functionAddress": func_addr, + } + if not grep_matches_item(item, grep_pattern): + continue + variables.append(item) + count += 1 + + # Global variables (defined data with symbol names) + listing = self.program.getListing() + st = self.program.getSymbolTable() + for data in listing.getDefinedData(True): + if count >= limit: + break + addr = data.getAddress() + symbol = st.getPrimarySymbol(addr) + if not symbol: + continue + sym_name = symbol.getName() + # Skip auto-generated names + if sym_name.startswith("DAT_") or sym_name.startswith("s_"): + continue + if search and search.lower() not in sym_name.lower(): + continue + if skipped < offset: + skipped += 1 + continue + item = { + "name": sym_name, + "address": str(addr), + "type": str(data.getDataType()), + "scope": "global", + "size": data.getLength(), + } + if not grep_matches_item(item, grep_pattern): + continue + variables.append(item) + count += 1 + + return {"success": True, "result": variables, "offset": offset, "limit": limit} + + # ================================================================== + # Bookmarks Handlers + # ================================================================== + + def handle_bookmarks(self, exchange): + """GET /bookmarks - List bookmarks with optional filtering.""" + if not self.program: + return self._no_program() + params = parse_query_params(exchange) + limit = parse_int(params.get("limit"), 100) + offset = parse_int(params.get("offset"), 0) + type_filter = params.get("type") + category_filter = params.get("category") + grep_pattern = compile_grep(params) + + bm = self.program.getBookmarkManager() + bookmarks = [] + count = 0 + skipped = 0 + + # Get bookmark types to iterate + if type_filter: + bm_types = [type_filter] + else: + bm_types = [str(t) for t in bm.getBookmarkTypes()] + + for btype in bm_types: + if count >= limit: + break + try: + it = bm.getBookmarksIterator(btype) + except: + continue + while it.hasNext() and count < limit: + bookmark = it.next() + if category_filter and bookmark.getCategory() != category_filter: + continue + if skipped < offset: + skipped += 1 + continue + item = { + "address": str(bookmark.getAddress()), + "type": bookmark.getTypeString(), + "category": bookmark.getCategory(), + "comment": bookmark.getComment(), + } + if not grep_matches_item(item, grep_pattern): + continue + bookmarks.append(item) + count += 1 + + return {"success": True, "result": bookmarks, "offset": offset, "limit": limit} + + def handle_bookmark_create(self, exchange): + """POST /bookmarks - Create a bookmark.""" + if not self.program: + return self._no_program() + body = parse_json_body(exchange) + address = body.get("address", "") + btype = body.get("type", "Note") + category = body.get("category", "") + comment = body.get("comment", "") + + if not address: + return {"success": False, "error": {"code": "MISSING_PARAMETER", "message": "'address' is required"}} + + try: + addr = self.program.getAddressFactory().getAddress(address) + bm = self.program.getBookmarkManager() + + def do_create(): + bm.setBookmark(addr, btype, category, comment) + + with_transaction(self.program, "Create bookmark", do_create) + return {"success": True, "result": { + "address": address, + "type": btype, + "category": category, + "comment": comment, + "message": "Bookmark created successfully", + }} + except Exception as e: + return {"success": False, "error": {"code": "BOOKMARK_ERROR", "message": str(e)}} + + def handle_bookmark_delete(self, exchange, addr_str): + """DELETE /bookmarks/{address} - Delete all bookmarks at address.""" + if not self.program: + return self._no_program() + + try: + addr = self.program.getAddressFactory().getAddress(addr_str) + bm = self.program.getBookmarkManager() + removed = [] + + def do_delete(): + for bookmark in list(bm.getBookmarks(addr)): + removed.append(bookmark.getTypeString()) + bookmark.remove() if hasattr(bookmark, 'remove') else bm.removeBookmark(bookmark) + + with_transaction(self.program, "Delete bookmarks", do_delete) + return {"success": True, "result": { + "address": addr_str, + "removedTypes": removed, + "count": len(removed), + "message": "Bookmarks deleted successfully", + }} + except Exception as e: + return {"success": False, "error": {"code": "BOOKMARK_ERROR", "message": str(e)}} + + # ================================================================== + # Enum Handlers + # ================================================================== + + def handle_enums(self, exchange): + """GET /datatypes/enums - List enum data types.""" + if not self.program: + return self._no_program() + params = parse_query_params(exchange) + limit = parse_int(params.get("limit"), 100) + offset = parse_int(params.get("offset"), 0) + grep_pattern = compile_grep(params) + + from ghidra.program.model.data import Enum as GhidraEnum + + dtm = self.program.getDataTypeManager() + enums = [] + count = 0 + skipped = 0 + + for dt in dtm.getAllDataTypes(): + if count >= limit: + break + if not isinstance(dt, GhidraEnum): + continue + if skipped < offset: + skipped += 1 + continue + + # Get enum members + members = [] + for name in dt.getNames(): + members.append({"name": name, "value": dt.getValue(name)}) + + item = { + "name": dt.getName(), + "category": str(dt.getCategoryPath()), + "size": dt.getLength(), + "members": members, + } + if not grep_matches_item(item, grep_pattern): + continue + enums.append(item) + count += 1 + + return {"success": True, "result": enums, "offset": offset, "limit": limit} + + def handle_enum_create(self, exchange): + """POST /datatypes/enums - Create a new enum.""" + if not self.program: + return self._no_program() + body = parse_json_body(exchange) + name = body.get("name", "") + size = int(body.get("size", 4)) + + if not name: + return {"success": False, "error": {"code": "MISSING_PARAMETER", "message": "'name' is required"}} + + try: + from ghidra.program.model.data import EnumDataType, CategoryPath + + dtm = self.program.getDataTypeManager() + new_enum = EnumDataType(name, size) + + def do_create(): + dtm.addDataType(new_enum, None) + + with_transaction(self.program, "Create enum", do_create) + return {"success": True, "result": { + "name": name, + "size": size, + "message": "Enum created successfully", + }} + except Exception as e: + return {"success": False, "error": {"code": "ENUM_ERROR", "message": str(e)}} + + # ================================================================== + # Typedef Handlers + # ================================================================== + + def handle_typedefs(self, exchange): + """GET /datatypes/typedefs - List typedef data types.""" + if not self.program: + return self._no_program() + params = parse_query_params(exchange) + limit = parse_int(params.get("limit"), 100) + offset = parse_int(params.get("offset"), 0) + grep_pattern = compile_grep(params) + + from ghidra.program.model.data import TypeDef + + dtm = self.program.getDataTypeManager() + typedefs = [] + count = 0 + skipped = 0 + + for dt in dtm.getAllDataTypes(): + if count >= limit: + break + if not isinstance(dt, TypeDef): + continue + if skipped < offset: + skipped += 1 + continue + item = { + "name": dt.getName(), + "category": str(dt.getCategoryPath()), + "baseType": dt.getBaseDataType().getName() if dt.getBaseDataType() else None, + "size": dt.getLength(), + } + if not grep_matches_item(item, grep_pattern): + continue + typedefs.append(item) + count += 1 + + return {"success": True, "result": typedefs, "offset": offset, "limit": limit} + + def handle_typedef_create(self, exchange): + """POST /datatypes/typedefs - Create a new typedef.""" + if not self.program: + return self._no_program() + body = parse_json_body(exchange) + name = body.get("name", "") + base_type_name = body.get("base_type", "") + + if not name or not base_type_name: + return {"success": False, "error": {"code": "MISSING_PARAMETER", "message": "'name' and 'base_type' are required"}} + + try: + from ghidra.program.model.data import TypedefDataType + + dtm = self.program.getDataTypeManager() + + # Use the shared resolver which handles builtins + path lookups + base_dt = resolve_data_type(dtm, base_type_name) + if not base_dt: + return {"success": False, "error": {"code": "NOT_FOUND", "message": "Base type not found: %s" % base_type_name}} + + new_typedef = TypedefDataType(name, base_dt) + + def do_create(): + dtm.addDataType(new_typedef, None) + + with_transaction(self.program, "Create typedef", do_create) + return {"success": True, "result": { + "name": name, + "baseType": base_type_name, + "message": "Typedef created successfully", + }} + except Exception as e: + return {"success": False, "error": {"code": "TYPEDEF_ERROR", "message": str(e)}} + # ================================================================== # Legacy Compatibility # ================================================================== diff --git a/src/ghydramcp/config.py b/src/ghydramcp/config.py index 85076a2..4228a13 100644 --- a/src/ghydramcp/config.py +++ b/src/ghydramcp/config.py @@ -97,6 +97,14 @@ class GhydraConfig: "data": 1000, "structs": 500, "xrefs": 500, + "symbols": 1000, + "segments": 500, + "variables": 1000, + "namespaces": 500, + "classes": 500, + "bookmarks": 1000, + "enums": 500, + "typedefs": 500, }) def __post_init__(self): diff --git a/src/ghydramcp/core/__init__.py b/src/ghydramcp/core/__init__.py index 74c6cec..72901c8 100644 --- a/src/ghydramcp/core/__init__.py +++ b/src/ghydramcp/core/__init__.py @@ -3,38 +3,38 @@ Contains HTTP client, pagination, progress reporting, and logging utilities. """ +from .filtering import ( + apply_grep, + estimate_and_guard, + project_fields, +) from .http_client import ( + get_instance_url, + safe_delete, safe_get, + safe_patch, safe_post, safe_put, - safe_patch, - safe_delete, simplify_response, - get_instance_url, +) +from .logging import ( + log_debug, + log_error, + log_info, + log_warning, ) from .pagination import ( CursorManager, CursorState, - paginate_response, - get_cursor_manager, estimate_tokens, + get_cursor_manager, + paginate_response, ) from .progress import ( ProgressReporter, report_progress, report_step, ) -from .filtering import ( - project_fields, - apply_grep, - estimate_and_guard, -) -from .logging import ( - log_info, - log_debug, - log_warning, - log_error, -) __all__ = [ # HTTP client diff --git a/src/ghydramcp/core/filtering.py b/src/ghydramcp/core/filtering.py index f735881..e845fa6 100644 --- a/src/ghydramcp/core/filtering.py +++ b/src/ghydramcp/core/filtering.py @@ -11,7 +11,6 @@ from typing import Any, Dict, Optional from ..config import get_config - # Token estimation (same ratio as pagination.py) TOKEN_ESTIMATION_RATIO = 4.0 diff --git a/src/ghydramcp/core/http_client.py b/src/ghydramcp/core/http_client.py index 08b1f81..b9c1620 100644 --- a/src/ghydramcp/core/http_client.py +++ b/src/ghydramcp/core/http_client.py @@ -12,7 +12,6 @@ import requests from ..config import get_config - # Allowed origins for CORS-like validation ALLOWED_ORIGINS = { "http://localhost", diff --git a/src/ghydramcp/core/logging.py b/src/ghydramcp/core/logging.py index b595472..3666303 100644 --- a/src/ghydramcp/core/logging.py +++ b/src/ghydramcp/core/logging.py @@ -5,7 +5,7 @@ client-visible logging when available, with fallback to standard logging. """ import logging -from typing import Optional, TYPE_CHECKING +from typing import TYPE_CHECKING, Optional if TYPE_CHECKING: from mcp.server.fastmcp import Context diff --git a/src/ghydramcp/core/pagination.py b/src/ghydramcp/core/pagination.py index 2eea804..3af1d47 100644 --- a/src/ghydramcp/core/pagination.py +++ b/src/ghydramcp/core/pagination.py @@ -14,8 +14,7 @@ from threading import Lock from typing import Any, Dict, List, Optional, Tuple from ..config import get_config -from .filtering import project_fields, estimate_and_guard - +from .filtering import estimate_and_guard, project_fields # ReDoS Protection Configuration MAX_GREP_PATTERN_LENGTH = 500 diff --git a/src/ghydramcp/core/progress.py b/src/ghydramcp/core/progress.py index 5a63cd8..0f5d624 100644 --- a/src/ghydramcp/core/progress.py +++ b/src/ghydramcp/core/progress.py @@ -4,7 +4,7 @@ Provides async progress reporting using FastMCP's Context for real-time progress notifications to MCP clients. """ -from typing import Optional, TYPE_CHECKING +from typing import TYPE_CHECKING, Optional if TYPE_CHECKING: from mcp.server.fastmcp import Context diff --git a/src/ghydramcp/mixins/__init__.py b/src/ghydramcp/mixins/__init__.py index 919112b..a839bcd 100644 --- a/src/ghydramcp/mixins/__init__.py +++ b/src/ghydramcp/mixins/__init__.py @@ -4,16 +4,22 @@ Domain-specific mixins that organize tools, resources, and prompts by functional Uses FastMCP's contrib.mcp_mixin pattern for clean modular organization. """ -from .base import GhydraMixinBase -from .instances import InstancesMixin -from .functions import FunctionsMixin -from .data import DataMixin -from .structs import StructsMixin from .analysis import AnalysisMixin -from .memory import MemoryMixin -from .xrefs import XrefsMixin +from .base import GhydraMixinBase +from .bookmarks import BookmarksMixin from .cursors import CursorsMixin +from .data import DataMixin +from .datatypes import DataTypesMixin from .docker import DockerMixin +from .functions import FunctionsMixin +from .instances import InstancesMixin +from .memory import MemoryMixin +from .namespaces import NamespacesMixin +from .segments import SegmentsMixin +from .structs import StructsMixin +from .symbols import SymbolsMixin +from .variables import VariablesMixin +from .xrefs import XrefsMixin __all__ = [ "GhydraMixinBase", @@ -26,4 +32,10 @@ __all__ = [ "XrefsMixin", "CursorsMixin", "DockerMixin", + "SymbolsMixin", + "SegmentsMixin", + "VariablesMixin", + "NamespacesMixin", + "BookmarksMixin", + "DataTypesMixin", ] diff --git a/src/ghydramcp/mixins/analysis.py b/src/ghydramcp/mixins/analysis.py index 1af360a..fef6055 100644 --- a/src/ghydramcp/mixins/analysis.py +++ b/src/ghydramcp/mixins/analysis.py @@ -8,8 +8,8 @@ from typing import Any, Dict, List, Optional from fastmcp import Context from fastmcp.contrib.mcp_mixin import mcp_tool -from .base import GhydraMixinBase from ..config import get_config +from .base import GhydraMixinBase class AnalysisMixin(GhydraMixinBase): @@ -277,6 +277,40 @@ class AnalysisMixin(GhydraMixinBase): response = self.safe_get(port, "function") return self.simplify_response(response) + @mcp_tool() + def comments_get( + self, + address: str, + comment_type: str = "plate", + port: Optional[int] = None, + ) -> Dict[str, Any]: + """Get a comment at the specified address. + + Args: + address: Memory address in hex format + comment_type: "plate", "pre", "post", "eol", "repeatable" + port: Ghidra instance port (optional) + + Returns: + Comment text and metadata + """ + if not address: + return { + "success": False, + "error": { + "code": "MISSING_PARAMETER", + "message": "Address parameter is required", + }, + } + + try: + port = self.get_instance_port(port) + except ValueError as e: + return {"success": False, "error": {"code": "NO_INSTANCE", "message": str(e)}} + + response = self.safe_get(port, f"memory/{address}/comments/{comment_type}") + return self.simplify_response(response) + @mcp_tool() def comments_set( self, diff --git a/src/ghydramcp/mixins/base.py b/src/ghydramcp/mixins/base.py index 6bbf048..e191026 100644 --- a/src/ghydramcp/mixins/base.py +++ b/src/ghydramcp/mixins/base.py @@ -11,9 +11,16 @@ from fastmcp import Context from fastmcp.contrib.mcp_mixin import MCPMixin from ..config import get_config -from ..core.http_client import safe_get, safe_post, safe_put, safe_patch, safe_delete, simplify_response +from ..core.http_client import ( + safe_delete, + safe_get, + safe_patch, + safe_post, + safe_put, + simplify_response, +) +from ..core.logging import log_debug, log_error, log_info, log_warning from ..core.pagination import paginate_response -from ..core.logging import log_info, log_debug, log_warning, log_error class GhydraMixinBase(MCPMixin): diff --git a/src/ghydramcp/mixins/bookmarks.py b/src/ghydramcp/mixins/bookmarks.py new file mode 100644 index 0000000..aece74b --- /dev/null +++ b/src/ghydramcp/mixins/bookmarks.py @@ -0,0 +1,171 @@ +"""Bookmarks mixin for GhydraMCP. + +Provides tools for managing Ghidra bookmarks (annotations at addresses). +""" + +from typing import Any, Dict, List, Optional + +from fastmcp import Context +from fastmcp.contrib.mcp_mixin import mcp_tool + +from ..config import get_config +from .base import GhydraMixinBase + + +class BookmarksMixin(GhydraMixinBase): + """Mixin for bookmark operations. + + Provides tools for: + - Listing bookmarks with type/category filtering + - Creating bookmarks at addresses + - Deleting bookmarks + """ + + @mcp_tool() + def bookmarks_list( + self, + type: Optional[str] = None, + category: Optional[str] = None, + port: Optional[int] = None, + page_size: int = 50, + grep: Optional[str] = None, + grep_ignorecase: bool = True, + return_all: bool = False, + fields: Optional[List[str]] = None, + ctx: Optional[Context] = None, + ) -> Dict[str, Any]: + """List bookmarks with optional type/category filtering. + + Args: + type: Filter by bookmark type (e.g. "Note", "Warning", "Error", "Info") + category: Filter by bookmark category + port: Ghidra instance port (optional) + page_size: Bookmarks per page (default: 50, max: 500) + grep: Regex pattern to filter bookmark comments + grep_ignorecase: Case-insensitive grep (default: True) + return_all: Return all bookmarks without pagination + fields: Field names to keep (e.g. ['address', 'type', 'comment']). Reduces response size. + ctx: FastMCP context (auto-injected) + + Returns: + Paginated list of bookmarks + """ + try: + port = self.get_instance_port(port) + except ValueError as e: + return {"success": False, "error": {"code": "NO_INSTANCE", "message": str(e)}} + + config = get_config() + cap = config.resource_caps.get("bookmarks", 1000) + params: Dict[str, Any] = {"limit": cap} + if type: + params["type"] = type + if category: + params["category"] = category + + response = self.safe_get(port, "bookmarks", params) + simplified = self.simplify_response(response) + + if not simplified.get("success", True): + return simplified + + bookmarks = simplified.get("result", []) + if not isinstance(bookmarks, list): + bookmarks = [] + + query_params = { + "tool": "bookmarks_list", + "port": port, + "type": type, + "category": category, + "grep": grep, + } + session_id = self._get_session_id(ctx) + + return self.filtered_paginate( + data=bookmarks, + query_params=query_params, + tool_name="bookmarks_list", + session_id=session_id, + page_size=min(page_size, config.max_page_size), + grep=grep, + grep_ignorecase=grep_ignorecase, + return_all=return_all, + fields=fields, + ) + + @mcp_tool() + def bookmarks_create( + self, + address: str, + type: str = "Note", + category: str = "", + comment: str = "", + port: Optional[int] = None, + ) -> Dict[str, Any]: + """Create a bookmark at the specified address. + + Args: + address: Memory address in hex format + type: Bookmark type (default: "Note"). Common types: Note, Warning, Error, Info + category: Bookmark category (optional grouping string) + comment: Bookmark comment text + port: Ghidra instance port (optional) + + Returns: + Created bookmark information + """ + if not address: + return { + "success": False, + "error": { + "code": "MISSING_PARAMETER", + "message": "address parameter is required", + }, + } + + try: + port = self.get_instance_port(port) + except ValueError as e: + return {"success": False, "error": {"code": "NO_INSTANCE", "message": str(e)}} + + payload = { + "address": address, + "type": type, + "category": category, + "comment": comment, + } + response = self.safe_post(port, "bookmarks", payload) + return self.simplify_response(response) + + @mcp_tool() + def bookmarks_delete( + self, + address: str, + port: Optional[int] = None, + ) -> Dict[str, Any]: + """Delete all bookmarks at the specified address. + + Args: + address: Memory address in hex format + port: Ghidra instance port (optional) + + Returns: + Operation result + """ + if not address: + return { + "success": False, + "error": { + "code": "MISSING_PARAMETER", + "message": "address parameter is required", + }, + } + + try: + port = self.get_instance_port(port) + except ValueError as e: + return {"success": False, "error": {"code": "NO_INSTANCE", "message": str(e)}} + + response = self.safe_delete(port, f"bookmarks/{address}") + return self.simplify_response(response) diff --git a/src/ghydramcp/mixins/cursors.py b/src/ghydramcp/mixins/cursors.py index a1cb68c..0f4324e 100644 --- a/src/ghydramcp/mixins/cursors.py +++ b/src/ghydramcp/mixins/cursors.py @@ -8,8 +8,8 @@ from typing import Any, Dict, Optional from fastmcp import Context from fastmcp.contrib.mcp_mixin import mcp_tool -from .base import GhydraMixinBase from ..core.pagination import get_cursor_manager +from .base import GhydraMixinBase class CursorsMixin(GhydraMixinBase): diff --git a/src/ghydramcp/mixins/data.py b/src/ghydramcp/mixins/data.py index dde51ec..7c43c5c 100644 --- a/src/ghydramcp/mixins/data.py +++ b/src/ghydramcp/mixins/data.py @@ -6,10 +6,10 @@ Provides tools for data items and strings operations. from typing import Any, Dict, List, Optional from fastmcp import Context -from fastmcp.contrib.mcp_mixin import mcp_tool, mcp_resource +from fastmcp.contrib.mcp_mixin import mcp_resource, mcp_tool -from .base import GhydraMixinBase from ..config import get_config +from .base import GhydraMixinBase class DataMixin(GhydraMixinBase): diff --git a/src/ghydramcp/mixins/datatypes.py b/src/ghydramcp/mixins/datatypes.py new file mode 100644 index 0000000..79c61d8 --- /dev/null +++ b/src/ghydramcp/mixins/datatypes.py @@ -0,0 +1,217 @@ +"""Data types mixin for GhydraMCP. + +Provides tools for managing enum and typedef data types. +""" + +from typing import Any, Dict, List, Optional + +from fastmcp import Context +from fastmcp.contrib.mcp_mixin import mcp_tool + +from ..config import get_config +from .base import GhydraMixinBase + + +class DataTypesMixin(GhydraMixinBase): + """Mixin for enum and typedef data type operations. + + Provides tools for: + - Listing and creating enum data types + - Listing and creating typedef data types + """ + + # --- Enums --- + + @mcp_tool() + def enums_list( + self, + port: Optional[int] = None, + page_size: int = 50, + grep: Optional[str] = None, + grep_ignorecase: bool = True, + return_all: bool = False, + fields: Optional[List[str]] = None, + ctx: Optional[Context] = None, + ) -> Dict[str, Any]: + """List enum data types with their members. + + Args: + port: Ghidra instance port (optional) + page_size: Enums per page (default: 50, max: 500) + grep: Regex pattern to filter enum names + grep_ignorecase: Case-insensitive grep (default: True) + return_all: Return all enums without pagination + fields: Field names to keep (e.g. ['name', 'size', 'members']). Reduces response size. + ctx: FastMCP context (auto-injected) + + Returns: + Paginated list of enum data types + """ + try: + port = self.get_instance_port(port) + except ValueError as e: + return {"success": False, "error": {"code": "NO_INSTANCE", "message": str(e)}} + + config = get_config() + cap = config.resource_caps.get("enums", 500) + response = self.safe_get(port, "datatypes/enums", {"limit": cap}) + simplified = self.simplify_response(response) + + if not simplified.get("success", True): + return simplified + + enums = simplified.get("result", []) + if not isinstance(enums, list): + enums = [] + + query_params = {"tool": "enums_list", "port": port, "grep": grep} + session_id = self._get_session_id(ctx) + + return self.filtered_paginate( + data=enums, + query_params=query_params, + tool_name="enums_list", + session_id=session_id, + page_size=min(page_size, config.max_page_size), + grep=grep, + grep_ignorecase=grep_ignorecase, + return_all=return_all, + fields=fields, + ) + + @mcp_tool() + def enums_create( + self, + name: str, + size: int = 4, + port: Optional[int] = None, + ) -> Dict[str, Any]: + """Create a new enum data type. + + Args: + name: Name for the new enum + size: Size in bytes (default: 4) + port: Ghidra instance port (optional) + + Returns: + Created enum information + """ + if not name: + return { + "success": False, + "error": { + "code": "MISSING_PARAMETER", + "message": "name parameter is required", + }, + } + + try: + port = self.get_instance_port(port) + except ValueError as e: + return {"success": False, "error": {"code": "NO_INSTANCE", "message": str(e)}} + + payload = {"name": name, "size": size} + response = self.safe_post(port, "datatypes/enums", payload) + return self.simplify_response(response) + + # --- Typedefs --- + + @mcp_tool() + def typedefs_list( + self, + port: Optional[int] = None, + page_size: int = 50, + grep: Optional[str] = None, + grep_ignorecase: bool = True, + return_all: bool = False, + fields: Optional[List[str]] = None, + ctx: Optional[Context] = None, + ) -> Dict[str, Any]: + """List typedef data types. + + Args: + port: Ghidra instance port (optional) + page_size: Typedefs per page (default: 50, max: 500) + grep: Regex pattern to filter typedef names + grep_ignorecase: Case-insensitive grep (default: True) + return_all: Return all typedefs without pagination + fields: Field names to keep (e.g. ['name', 'base_type']). Reduces response size. + ctx: FastMCP context (auto-injected) + + Returns: + Paginated list of typedef data types + """ + try: + port = self.get_instance_port(port) + except ValueError as e: + return {"success": False, "error": {"code": "NO_INSTANCE", "message": str(e)}} + + config = get_config() + cap = config.resource_caps.get("typedefs", 500) + response = self.safe_get(port, "datatypes/typedefs", {"limit": cap}) + simplified = self.simplify_response(response) + + if not simplified.get("success", True): + return simplified + + typedefs = simplified.get("result", []) + if not isinstance(typedefs, list): + typedefs = [] + + query_params = {"tool": "typedefs_list", "port": port, "grep": grep} + session_id = self._get_session_id(ctx) + + return self.filtered_paginate( + data=typedefs, + query_params=query_params, + tool_name="typedefs_list", + session_id=session_id, + page_size=min(page_size, config.max_page_size), + grep=grep, + grep_ignorecase=grep_ignorecase, + return_all=return_all, + fields=fields, + ) + + @mcp_tool() + def typedefs_create( + self, + name: str, + base_type: str, + port: Optional[int] = None, + ) -> Dict[str, Any]: + """Create a new typedef data type. + + Args: + name: Name for the new typedef + base_type: Name of the base data type (e.g. "int", "uint32_t", "char*") + port: Ghidra instance port (optional) + + Returns: + Created typedef information + """ + if not name: + return { + "success": False, + "error": { + "code": "MISSING_PARAMETER", + "message": "name parameter is required", + }, + } + if not base_type: + return { + "success": False, + "error": { + "code": "MISSING_PARAMETER", + "message": "base_type parameter is required", + }, + } + + try: + port = self.get_instance_port(port) + except ValueError as e: + return {"success": False, "error": {"code": "NO_INSTANCE", "message": str(e)}} + + payload = {"name": name, "base_type": base_type} + response = self.safe_post(port, "datatypes/typedefs", payload) + return self.simplify_response(response) diff --git a/src/ghydramcp/mixins/docker.py b/src/ghydramcp/mixins/docker.py index 51add07..ded88de 100644 --- a/src/ghydramcp/mixins/docker.py +++ b/src/ghydramcp/mixins/docker.py @@ -16,17 +16,14 @@ import subprocess import time import uuid from pathlib import Path -from typing import Any, Dict, List, Optional, Set +from typing import Any, Dict, List, Optional from fastmcp import Context from fastmcp.contrib.mcp_mixin import MCPMixin, mcp_tool -from ..config import get_config, get_docker_config - - -# Port pool configuration +# Port pool configuration (32 ports should handle many concurrent sessions) PORT_POOL_START = 8192 -PORT_POOL_END = 8199 +PORT_POOL_END = 8223 PORT_LOCK_DIR = Path("/tmp/ghydramcp-ports") @@ -207,7 +204,7 @@ class DockerMixin(MCPMixin): with the GhydraMCP plugin pre-installed. Supports multi-process environments with: - - Dynamic port allocation from a pool (8192-8199) + - Dynamic port allocation from a pool (8192-8223) - Session-scoped container naming with UUIDs - Docker label-based tracking for cross-process visibility - Automatic cleanup of orphaned containers @@ -513,7 +510,6 @@ class DockerMixin(MCPMixin): async def docker_start( self, binary_path: str, - port: Optional[int] = None, memory: str = "2G", name: Optional[str] = None, ctx: Optional[Context] = None, @@ -522,15 +518,14 @@ class DockerMixin(MCPMixin): This creates a new Ghidra instance in Docker with the GhydraMCP plugin pre-installed. The binary will be imported and analyzed, - then the HTTP API will be available on the specified port. + then the HTTP API will be available. - If no port is specified, one will be automatically allocated from - the pool (8192-8199). Container names are auto-generated with the - session ID to ensure uniqueness across processes. + Ports are automatically allocated from the pool (8192-8223) to + prevent conflicts between concurrent sessions. Container names + are auto-generated with the session ID to ensure uniqueness. Args: binary_path: Path to the binary file to analyze - port: Port to expose the HTTP API (auto-allocated if not specified) memory: Max JVM heap memory (default: 2G) name: Container name (auto-generated if not specified) @@ -545,16 +540,13 @@ class DockerMixin(MCPMixin): if not binary_file.exists(): return {"error": f"Binary not found: {binary_path}"} - # Allocate port from pool if not specified - allocated_port = False + # Always allocate from pool to prevent conflicts between sessions + port = self._port_pool.allocate(self.session_id) if port is None: - port = self._port_pool.allocate(self.session_id) - if port is None: - return { - "error": "Port pool exhausted (8192-8199). Stop some containers first.", - "allocated_ports": self._port_pool.get_allocated_ports(), - } - allocated_port = True + return { + "error": "Port pool exhausted (8192-8223). Stop some containers first.", + "allocated_ports": self._port_pool.get_allocated_ports(), + } # Generate container name if not specified if name is None: @@ -569,8 +561,7 @@ class DockerMixin(MCPMixin): ["ps", "-a", "-q", "-f", f"name=^{name}$"], check=False ) if check_result.stdout.strip(): - if allocated_port: - self._port_pool.release(port) + self._port_pool.release(port) return { "error": f"Container '{name}' already exists. Stop it first with docker_stop." } @@ -580,8 +571,7 @@ class DockerMixin(MCPMixin): ["ps", "-q", "-f", f"publish={port}"], check=False ) if port_check.stdout.strip(): - if allocated_port: - self._port_pool.release(port) + self._port_pool.release(port) return { "error": f"Port {port} is already in use by another container" } @@ -619,7 +609,6 @@ class DockerMixin(MCPMixin): "port": port, "binary": str(binary_file), "memory": memory, - "allocated_port": allocated_port, } return { @@ -638,8 +627,7 @@ class DockerMixin(MCPMixin): } except subprocess.CalledProcessError as e: - if allocated_port: - self._port_pool.release(port) + self._port_pool.release(port) return {"error": f"Failed to start container: {e.stderr or e.stdout}"} @mcp_tool( @@ -651,6 +639,10 @@ class DockerMixin(MCPMixin): ) -> Dict[str, Any]: """Stop a GhydraMCP Docker container. + For safety, this will only stop containers that belong to the current + MCP session. Attempting to stop another session's container will fail + with an error explaining whose container it is. + Args: name_or_id: Container name or ID remove: Also remove the container (default: True) @@ -661,18 +653,34 @@ class DockerMixin(MCPMixin): if not self._check_docker_available(): return {"error": "Docker is not available on this system"} - # Find the container to get its port for pool release + # Get container's session and port labels for validation container_port = None + container_session = None try: inspect_result = self._run_docker_cmd( - ["inspect", "--format", "{{index .Config.Labels \"" + self.LABEL_PREFIX + ".port\"}}", name_or_id], + [ + "inspect", + "--format", + "{{index .Config.Labels \"" + self.LABEL_PREFIX + ".port\"}}|{{index .Config.Labels \"" + self.LABEL_PREFIX + ".session\"}}", + name_or_id, + ], check=False, ) - if inspect_result.stdout.strip().isdigit(): - container_port = int(inspect_result.stdout.strip()) + parts = inspect_result.stdout.strip().split("|") + if len(parts) >= 2: + if parts[0].isdigit(): + container_port = int(parts[0]) + container_session = parts[1] if parts[1] else None except Exception: pass + # Session validation: only allow stopping own containers + if container_session and container_session != self.session_id: + return { + "error": f"Cannot stop container '{name_or_id}' - it belongs to session '{container_session}', not this session '{self.session_id}'.", + "hint": "Each MCP session can only stop its own containers for safety.", + } + try: # Stop the container self._run_docker_cmd(["stop", name_or_id]) @@ -807,25 +815,19 @@ class DockerMixin(MCPMixin): except subprocess.CalledProcessError as e: return {"error": f"Build failed: {e.stderr or e.stdout}"} - @mcp_tool( - name="docker_health", - description="Check if a GhydraMCP container's API is responding", - ) - async def docker_health( - self, port: int = 8192, timeout: float = 5.0, ctx: Optional[Context] = None - ) -> Dict[str, Any]: - """Check if a GhydraMCP container's API is healthy. + def _sync_health_check(self, port: int, timeout: float) -> Dict[str, Any]: + """Synchronous health check (runs in thread to avoid blocking event loop). Args: - port: API port to check (default: 8192) + port: API port to check timeout: Request timeout in seconds Returns: - Health status and API info if available + Health status dict """ - import urllib.request - import urllib.error import json as json_module + import urllib.error + import urllib.request url = f"http://localhost:{port}/" @@ -854,6 +856,27 @@ class DockerMixin(MCPMixin): "error": str(e), } + @mcp_tool( + name="docker_health", + description="Check if a GhydraMCP container's API is responding", + ) + async def docker_health( + self, port: int = 8192, timeout: float = 5.0, ctx: Optional[Context] = None + ) -> Dict[str, Any]: + """Check if a GhydraMCP container's API is healthy. + + Args: + port: API port to check (default: 8192) + timeout: Request timeout in seconds + + Returns: + Health status and API info if available + """ + loop = asyncio.get_event_loop() + return await loop.run_in_executor( + None, self._sync_health_check, port, timeout + ) + @mcp_tool( name="docker_wait", description="Wait for a GhydraMCP container to become healthy", @@ -902,57 +925,51 @@ class DockerMixin(MCPMixin): async def docker_auto_start( self, binary_path: str, - port: Optional[int] = None, - wait: bool = True, + wait: bool = False, timeout: float = 300.0, ctx: Optional[Context] = None, ) -> Dict[str, Any]: """Automatically start a Docker container with intelligent port allocation. This is the main entry point for automatic Docker management: - 1. Checks if a Ghidra instance is already running (on specified or any pooled port) + 1. Checks if a Ghidra instance with the SAME binary is already running 2. If not, allocates a port from the pool and starts a new container 3. Optionally waits for the container to become healthy 4. Returns connection info for the instance - When port is not specified, the system will: - - First check all pooled ports (8192-8199) for an existing healthy instance - - If none found, allocate a new port from the pool + Ports are auto-allocated from the pool (8192-8223) to prevent + conflicts between concurrent sessions. Args: binary_path: Path to the binary to analyze - port: Specific port for the HTTP API (auto-allocated if not specified) - wait: Wait for container to be ready (default: True) + wait: Wait for container to be ready (default: False, use docker_wait separately) timeout: Max wait time in seconds (default: 300) Returns: Instance connection info with session ID and port details """ - # If port is specified, check that specific port - if port is not None: - health = await self.docker_health(port=port, ctx=ctx) - if health.get("healthy"): + import os + + requested_name = os.path.basename(binary_path) + + def _is_same_binary(health_program: str) -> bool: + """Check if a running instance has the same binary loaded.""" + if not health_program: + return False + return os.path.basename(health_program) == requested_name + + # Check all pooled ports for an instance with the SAME binary + for check_port in range(PORT_POOL_START, PORT_POOL_END + 1): + health = await self.docker_health(port=check_port, timeout=1.0, ctx=ctx) + if health.get("healthy") and _is_same_binary(health.get("program", "")): return { "source": "existing", "session_id": self.session_id, - "port": port, - "api_url": f"http://localhost:{port}/", + "port": check_port, + "api_url": f"http://localhost:{check_port}/", "program": health.get("program"), - "message": "Using existing Ghidra instance", + "message": f"Found existing Ghidra instance on port {check_port}", } - else: - # Check all pooled ports for an existing instance - for check_port in range(PORT_POOL_START, PORT_POOL_END + 1): - health = await self.docker_health(port=check_port, timeout=1.0, ctx=ctx) - if health.get("healthy"): - return { - "source": "existing", - "session_id": self.session_id, - "port": check_port, - "api_url": f"http://localhost:{check_port}/", - "program": health.get("program"), - "message": f"Found existing Ghidra instance on port {check_port}", - } # Check if Docker is available status = await self.docker_status(ctx=ctx) @@ -970,9 +987,9 @@ class DockerMixin(MCPMixin): ) } - # Start a new container (port will be auto-allocated if not specified) + # Start a new container (port auto-allocated from pool) start_result = await self.docker_start( - binary_path=binary_path, port=port, ctx=ctx + binary_path=binary_path, ctx=ctx ) if not start_result.get("success"): @@ -1021,7 +1038,7 @@ class DockerMixin(MCPMixin): ) async def docker_cleanup( self, - session_only: bool = False, + session_only: bool = True, max_age_hours: float = 24.0, dry_run: bool = False, ctx: Optional[Context] = None, @@ -1031,8 +1048,12 @@ class DockerMixin(MCPMixin): This helps recover from crashed processes that left containers or port locks behind. + By default, only cleans containers from the current session to prevent + accidentally removing another agent's work. Set session_only=False + (with caution) to clean all GhydraMCP containers. + Args: - session_only: Only clean up containers from this session + session_only: Only clean up containers from this session (default: True for safety) max_age_hours: Max age for orphaned containers (default: 24 hours) dry_run: If True, only report what would be cleaned up diff --git a/src/ghydramcp/mixins/functions.py b/src/ghydramcp/mixins/functions.py index 3eb129b..94914dd 100644 --- a/src/ghydramcp/mixins/functions.py +++ b/src/ghydramcp/mixins/functions.py @@ -7,10 +7,10 @@ from typing import Any, Dict, List, Optional from urllib.parse import quote from fastmcp import Context -from fastmcp.contrib.mcp_mixin import mcp_tool, mcp_resource +from fastmcp.contrib.mcp_mixin import mcp_resource, mcp_tool -from .base import GhydraMixinBase from ..config import get_config +from .base import GhydraMixinBase class FunctionsMixin(GhydraMixinBase): @@ -28,6 +28,9 @@ class FunctionsMixin(GhydraMixinBase): @mcp_tool() def functions_list( self, + name_contains: Optional[str] = None, + name_regex: Optional[str] = None, + address: Optional[str] = None, port: Optional[int] = None, page_size: int = 50, grep: Optional[str] = None, @@ -36,12 +39,15 @@ class FunctionsMixin(GhydraMixinBase): fields: Optional[List[str]] = None, ctx: Optional[Context] = None, ) -> Dict[str, Any]: - """List functions with cursor-based pagination. + """List functions with cursor-based pagination and server-side filtering. Args: + name_contains: Server-side substring filter on function name (faster than grep for large binaries) + name_regex: Server-side regex filter on function name + address: Filter by exact function address (hex) port: Ghidra instance port (optional) page_size: Functions per page (default: 50, max: 500) - grep: Regex pattern to filter function names + grep: Client-side regex pattern to filter function names grep_ignorecase: Case-insensitive grep (default: True) return_all: Return all functions without pagination fields: Field names to keep (e.g. ['name', 'address']). Reduces response size. @@ -56,7 +62,15 @@ class FunctionsMixin(GhydraMixinBase): return {"success": False, "error": {"code": "NO_INSTANCE", "message": str(e)}} config = get_config() - response = self.safe_get(port, "functions", {"limit": 10000}) + params = {"limit": 10000} + if name_contains: + params["name_contains"] = name_contains + if name_regex: + params["name_matches_regex"] = name_regex + if address: + params["addr"] = address + + response = self.safe_get(port, "functions", params) simplified = self.simplify_response(response) if not simplified.get("success", True): @@ -66,7 +80,14 @@ class FunctionsMixin(GhydraMixinBase): if not isinstance(functions, list): functions = [] - query_params = {"tool": "functions_list", "port": port, "grep": grep} + query_params = { + "tool": "functions_list", + "port": port, + "name_contains": name_contains, + "name_regex": name_regex, + "address": address, + "grep": grep, + } session_id = self._get_session_id(ctx) return self.filtered_paginate( @@ -467,7 +488,7 @@ class FunctionsMixin(GhydraMixinBase): "functions": functions[:cap], "count": len(functions), "capped_at": cap if len(functions) >= cap else None, - "_hint": f"Use functions_list() tool for full pagination" if len(functions) >= cap else None, + "_hint": "Use functions_list() tool for full pagination" if len(functions) >= cap else None, } @mcp_resource(uri="ghidra://instance/{port}/function/decompile/address/{address}") diff --git a/src/ghydramcp/mixins/instances.py b/src/ghydramcp/mixins/instances.py index e92f4d4..4b93269 100644 --- a/src/ghydramcp/mixins/instances.py +++ b/src/ghydramcp/mixins/instances.py @@ -4,13 +4,12 @@ Provides tools for discovering, registering, and managing Ghidra instances. """ import time -from typing import Any, Dict, List, Optional +from typing import Any, Dict, Optional -from fastmcp.contrib.mcp_mixin import mcp_tool, mcp_resource +from fastmcp.contrib.mcp_mixin import mcp_resource, mcp_tool -from .base import GhydraMixinBase from ..config import get_config -from ..core.http_client import safe_get +from .base import GhydraMixinBase class InstancesMixin(GhydraMixinBase): @@ -167,13 +166,20 @@ class InstancesMixin(GhydraMixinBase): Returns: Confirmation message with instance details """ + # Register lazily without blocking HTTP calls. + # If the instance is unknown, create a stub entry — the first + # actual tool call (functions_list, etc.) will validate the + # connection and fail fast with a clear error if unreachable. with self._instances_lock: - needs_register = port not in self._instances - - if needs_register: - result = self.register_instance(port) - if "Failed" in result or "Error" in result: - return result + if port not in self._instances: + config = get_config() + self._instances[port] = { + "url": f"http://{config.ghidra_host}:{port}", + "project": "", + "file": "", + "registered_at": time.time(), + "lazy": True, + } self.set_current_port(port) @@ -211,6 +217,25 @@ class InstancesMixin(GhydraMixinBase): return {"port": port, "status": "registered but no details available"} + @mcp_tool() + def program_info(self, port: Optional[int] = None) -> Dict[str, Any]: + """Get full program metadata (architecture, language, compiler, image base, memory size). + + Args: + port: Ghidra instance port (optional) + + Returns: + Program metadata including architecture, language, compiler spec, + image base address, and total memory size + """ + try: + port = self.get_instance_port(port) + except ValueError as e: + return {"success": False, "error": {"code": "NO_INSTANCE", "message": str(e)}} + + response = self.safe_get(port, "program") + return self.simplify_response(response) + @mcp_resource(uri="ghidra://instances") def resource_instances_list(self) -> Dict[str, Any]: """MCP Resource: List all active Ghidra instances. @@ -297,3 +322,21 @@ class InstancesMixin(GhydraMixinBase): "string_count": string_count, "port": port, } + + @mcp_resource(uri="ghidra://instance/{port}/program") + def resource_program_info(self, port: Optional[int] = None) -> Dict[str, Any]: + """MCP Resource: Get program metadata for a Ghidra instance. + + Args: + port: Ghidra instance port + + Returns: + Program metadata (architecture, language, compiler, image base) + """ + try: + port = self.get_instance_port(port) + except ValueError as e: + return {"error": str(e)} + + response = self.safe_get(port, "program") + return self.simplify_response(response) diff --git a/src/ghydramcp/mixins/namespaces.py b/src/ghydramcp/mixins/namespaces.py new file mode 100644 index 0000000..a298ae1 --- /dev/null +++ b/src/ghydramcp/mixins/namespaces.py @@ -0,0 +1,211 @@ +"""Namespaces mixin for GhydraMCP. + +Provides tools for querying namespaces and class definitions. +""" + +from typing import Any, Dict, List, Optional + +from fastmcp import Context +from fastmcp.contrib.mcp_mixin import mcp_resource, mcp_tool + +from ..config import get_config +from .base import GhydraMixinBase + + +class NamespacesMixin(GhydraMixinBase): + """Mixin for namespace and class operations. + + Provides tools for: + - Listing all non-global namespaces + - Listing class namespaces with qualified names + """ + + @mcp_tool() + def namespaces_list( + self, + port: Optional[int] = None, + page_size: int = 50, + grep: Optional[str] = None, + grep_ignorecase: bool = True, + return_all: bool = False, + fields: Optional[List[str]] = None, + ctx: Optional[Context] = None, + ) -> Dict[str, Any]: + """List all non-global namespaces with pagination. + + Args: + port: Ghidra instance port (optional) + page_size: Namespaces per page (default: 50, max: 500) + grep: Regex pattern to filter namespace names + grep_ignorecase: Case-insensitive grep (default: True) + return_all: Return all namespaces without pagination + fields: Field names to keep (e.g. ['name', 'id']). Reduces response size. + ctx: FastMCP context (auto-injected) + + Returns: + Paginated list of namespaces + """ + try: + port = self.get_instance_port(port) + except ValueError as e: + return {"success": False, "error": {"code": "NO_INSTANCE", "message": str(e)}} + + config = get_config() + cap = config.resource_caps.get("namespaces", 500) + response = self.safe_get(port, "namespaces", {"limit": cap}) + simplified = self.simplify_response(response) + + if not simplified.get("success", True): + return simplified + + namespaces = simplified.get("result", []) + if not isinstance(namespaces, list): + namespaces = [] + + query_params = {"tool": "namespaces_list", "port": port, "grep": grep} + session_id = self._get_session_id(ctx) + + return self.filtered_paginate( + data=namespaces, + query_params=query_params, + tool_name="namespaces_list", + session_id=session_id, + page_size=min(page_size, config.max_page_size), + grep=grep, + grep_ignorecase=grep_ignorecase, + return_all=return_all, + fields=fields, + ) + + @mcp_tool() + def classes_list( + self, + port: Optional[int] = None, + page_size: int = 50, + grep: Optional[str] = None, + grep_ignorecase: bool = True, + return_all: bool = False, + fields: Optional[List[str]] = None, + ctx: Optional[Context] = None, + ) -> Dict[str, Any]: + """List class namespaces with qualified names. + + Args: + port: Ghidra instance port (optional) + page_size: Classes per page (default: 50, max: 500) + grep: Regex pattern to filter class names + grep_ignorecase: Case-insensitive grep (default: True) + return_all: Return all classes without pagination + fields: Field names to keep (e.g. ['name', 'qualified_name']). Reduces response size. + ctx: FastMCP context (auto-injected) + + Returns: + Paginated list of class namespaces + """ + try: + port = self.get_instance_port(port) + except ValueError as e: + return {"success": False, "error": {"code": "NO_INSTANCE", "message": str(e)}} + + config = get_config() + cap = config.resource_caps.get("classes", 500) + response = self.safe_get(port, "classes", {"limit": cap}) + simplified = self.simplify_response(response) + + if not simplified.get("success", True): + return simplified + + classes = simplified.get("result", []) + if not isinstance(classes, list): + classes = [] + + query_params = {"tool": "classes_list", "port": port, "grep": grep} + session_id = self._get_session_id(ctx) + + return self.filtered_paginate( + data=classes, + query_params=query_params, + tool_name="classes_list", + session_id=session_id, + page_size=min(page_size, config.max_page_size), + grep=grep, + grep_ignorecase=grep_ignorecase, + return_all=return_all, + fields=fields, + ) + + # Resources + + @mcp_resource(uri="ghidra://instance/{port}/namespaces") + def resource_namespaces_list(self, port: Optional[int] = None) -> Dict[str, Any]: + """MCP Resource: List namespaces (capped). + + Args: + port: Ghidra instance port + + Returns: + List of namespaces (capped) + """ + try: + port = self.get_instance_port(port) + except ValueError as e: + return {"error": str(e)} + + config = get_config() + cap = config.resource_caps.get("namespaces", 500) + + response = self.safe_get(port, "namespaces", {"limit": cap}) + simplified = self.simplify_response(response) + + if not simplified.get("success", True): + return simplified + + namespaces = simplified.get("result", []) + if not isinstance(namespaces, list): + namespaces = [] + + return { + "namespaces": namespaces[:cap], + "count": len(namespaces), + "capped_at": cap if len(namespaces) >= cap else None, + "_hint": "Use namespaces_list() tool for full pagination" + if len(namespaces) >= cap + else None, + } + + @mcp_resource(uri="ghidra://instance/{port}/classes") + def resource_classes_list(self, port: Optional[int] = None) -> Dict[str, Any]: + """MCP Resource: List classes (capped). + + Args: + port: Ghidra instance port + + Returns: + List of class namespaces (capped) + """ + try: + port = self.get_instance_port(port) + except ValueError as e: + return {"error": str(e)} + + config = get_config() + cap = config.resource_caps.get("classes", 500) + + response = self.safe_get(port, "classes", {"limit": cap}) + simplified = self.simplify_response(response) + + if not simplified.get("success", True): + return simplified + + classes = simplified.get("result", []) + if not isinstance(classes, list): + classes = [] + + return { + "classes": classes[:cap], + "count": len(classes), + "capped_at": cap if len(classes) >= cap else None, + "_hint": "Use classes_list() tool for full pagination" + if len(classes) >= cap + else None, + } diff --git a/src/ghydramcp/mixins/segments.py b/src/ghydramcp/mixins/segments.py new file mode 100644 index 0000000..9082cb6 --- /dev/null +++ b/src/ghydramcp/mixins/segments.py @@ -0,0 +1,122 @@ +"""Segments mixin for GhydraMCP. + +Provides tools for querying memory segments (sections) and their permissions. +""" + +from typing import Any, Dict, List, Optional + +from fastmcp import Context +from fastmcp.contrib.mcp_mixin import mcp_resource, mcp_tool + +from ..config import get_config +from .base import GhydraMixinBase + + +class SegmentsMixin(GhydraMixinBase): + """Mixin for memory segment operations. + + Provides tools for: + - Listing memory segments with permissions and size info + """ + + @mcp_tool() + def segments_list( + self, + name: Optional[str] = None, + port: Optional[int] = None, + page_size: int = 50, + grep: Optional[str] = None, + grep_ignorecase: bool = True, + return_all: bool = False, + fields: Optional[List[str]] = None, + ctx: Optional[Context] = None, + ) -> Dict[str, Any]: + """List memory segments with R/W/X permissions and size info. + + Args: + name: Filter by segment name (server-side, exact match) + port: Ghidra instance port (optional) + page_size: Segments per page (default: 50, max: 500) + grep: Regex pattern to filter segment names + grep_ignorecase: Case-insensitive grep (default: True) + return_all: Return all segments without pagination + fields: Field names to keep (e.g. ['name', 'start', 'permissions']). Reduces response size. + ctx: FastMCP context (auto-injected) + + Returns: + Paginated list of memory segments + """ + try: + port = self.get_instance_port(port) + except ValueError as e: + return {"success": False, "error": {"code": "NO_INSTANCE", "message": str(e)}} + + config = get_config() + cap = config.resource_caps.get("segments", 500) + params = {"limit": cap} + if name: + params["name"] = name + + response = self.safe_get(port, "segments", params) + simplified = self.simplify_response(response) + + if not simplified.get("success", True): + return simplified + + segments = simplified.get("result", []) + if not isinstance(segments, list): + segments = [] + + query_params = {"tool": "segments_list", "port": port, "name": name, "grep": grep} + session_id = self._get_session_id(ctx) + + return self.filtered_paginate( + data=segments, + query_params=query_params, + tool_name="segments_list", + session_id=session_id, + page_size=min(page_size, config.max_page_size), + grep=grep, + grep_ignorecase=grep_ignorecase, + return_all=return_all, + fields=fields, + ) + + # Resources + + @mcp_resource(uri="ghidra://instance/{port}/segments") + def resource_segments_list(self, port: Optional[int] = None) -> Dict[str, Any]: + """MCP Resource: List memory segments (capped). + + Args: + port: Ghidra instance port + + Returns: + List of memory segments (capped) + """ + try: + port = self.get_instance_port(port) + except ValueError as e: + return {"error": str(e)} + + config = get_config() + cap = config.resource_caps.get("segments", 500) + + response = self.safe_get(port, "segments", {"limit": cap}) + simplified = self.simplify_response(response) + + if not simplified.get("success", True): + return simplified + + segments = simplified.get("result", []) + if not isinstance(segments, list): + segments = [] + + return { + "segments": segments[:cap], + "count": len(segments), + "capped_at": cap if len(segments) >= cap else None, + "_hint": "Use segments_list() tool for full pagination" + if len(segments) >= cap + else None, + } diff --git a/src/ghydramcp/mixins/structs.py b/src/ghydramcp/mixins/structs.py index be427e1..1463ca8 100644 --- a/src/ghydramcp/mixins/structs.py +++ b/src/ghydramcp/mixins/structs.py @@ -6,10 +6,10 @@ Provides tools for struct data type operations. from typing import Any, Dict, List, Optional from fastmcp import Context -from fastmcp.contrib.mcp_mixin import mcp_tool, mcp_resource +from fastmcp.contrib.mcp_mixin import mcp_resource, mcp_tool -from .base import GhydraMixinBase from ..config import get_config +from .base import GhydraMixinBase class StructsMixin(GhydraMixinBase): diff --git a/src/ghydramcp/mixins/symbols.py b/src/ghydramcp/mixins/symbols.py new file mode 100644 index 0000000..4f1ee0a --- /dev/null +++ b/src/ghydramcp/mixins/symbols.py @@ -0,0 +1,422 @@ +"""Symbols mixin for GhydraMCP. + +Provides tools for symbol table operations including labels, imports, and exports. +""" + +from typing import Any, Dict, List, Optional + +from fastmcp import Context +from fastmcp.contrib.mcp_mixin import mcp_resource, mcp_tool + +from ..config import get_config +from .base import GhydraMixinBase + + +class SymbolsMixin(GhydraMixinBase): + """Mixin for symbol table operations. + + Provides tools for: + - Listing all symbols with pagination + - Querying imported symbols (external references) + - Querying exported symbols (entry points) + """ + + @mcp_tool() + def symbols_list( + self, + port: Optional[int] = None, + page_size: int = 50, + grep: Optional[str] = None, + grep_ignorecase: bool = True, + return_all: bool = False, + fields: Optional[List[str]] = None, + ctx: Optional[Context] = None, + ) -> Dict[str, Any]: + """List symbols with cursor-based pagination. + + Args: + port: Ghidra instance port (optional) + page_size: Symbols per page (default: 50, max: 500) + grep: Regex pattern to filter symbol names + grep_ignorecase: Case-insensitive grep (default: True) + return_all: Return all symbols without pagination + fields: Field names to keep (e.g. ['name', 'address']). Reduces response size. + ctx: FastMCP context (auto-injected) + + Returns: + Paginated list of symbols + """ + try: + port = self.get_instance_port(port) + except ValueError as e: + return {"success": False, "error": {"code": "NO_INSTANCE", "message": str(e)}} + + config = get_config() + cap = config.resource_caps.get("symbols", 1000) + response = self.safe_get(port, "symbols", {"limit": cap}) + simplified = self.simplify_response(response) + + if not simplified.get("success", True): + return simplified + + symbols = simplified.get("result", []) + if not isinstance(symbols, list): + symbols = [] + + query_params = {"tool": "symbols_list", "port": port, "grep": grep} + session_id = self._get_session_id(ctx) + + return self.filtered_paginate( + data=symbols, + query_params=query_params, + tool_name="symbols_list", + session_id=session_id, + page_size=min(page_size, config.max_page_size), + grep=grep, + grep_ignorecase=grep_ignorecase, + return_all=return_all, + fields=fields, + ) + + @mcp_tool() + def symbols_imports( + self, + port: Optional[int] = None, + page_size: int = 50, + grep: Optional[str] = None, + grep_ignorecase: bool = True, + return_all: bool = False, + fields: Optional[List[str]] = None, + ctx: Optional[Context] = None, + ) -> Dict[str, Any]: + """List imported symbols (external references) with pagination. + + Args: + port: Ghidra instance port (optional) + page_size: Imports per page (default: 50, max: 500) + grep: Regex pattern to filter import names + grep_ignorecase: Case-insensitive grep (default: True) + return_all: Return all imports without pagination + fields: Field names to keep. Reduces response size. + ctx: FastMCP context (auto-injected) + + Returns: + Paginated list of imported symbols + """ + try: + port = self.get_instance_port(port) + except ValueError as e: + return {"success": False, "error": {"code": "NO_INSTANCE", "message": str(e)}} + + config = get_config() + cap = config.resource_caps.get("symbols", 1000) + response = self.safe_get(port, "symbols/imports", {"limit": cap}) + simplified = self.simplify_response(response) + + if not simplified.get("success", True): + return simplified + + imports = simplified.get("result", []) + if not isinstance(imports, list): + imports = [] + + query_params = {"tool": "symbols_imports", "port": port, "grep": grep} + session_id = self._get_session_id(ctx) + + return self.filtered_paginate( + data=imports, + query_params=query_params, + tool_name="symbols_imports", + session_id=session_id, + page_size=min(page_size, config.max_page_size), + grep=grep, + grep_ignorecase=grep_ignorecase, + return_all=return_all, + fields=fields, + ) + + @mcp_tool() + def symbols_exports( + self, + port: Optional[int] = None, + page_size: int = 50, + grep: Optional[str] = None, + grep_ignorecase: bool = True, + return_all: bool = False, + fields: Optional[List[str]] = None, + ctx: Optional[Context] = None, + ) -> Dict[str, Any]: + """List exported symbols (entry points) with pagination. + + Args: + port: Ghidra instance port (optional) + page_size: Exports per page (default: 50, max: 500) + grep: Regex pattern to filter export names + grep_ignorecase: Case-insensitive grep (default: True) + return_all: Return all exports without pagination + fields: Field names to keep. Reduces response size. + ctx: FastMCP context (auto-injected) + + Returns: + Paginated list of exported symbols + """ + try: + port = self.get_instance_port(port) + except ValueError as e: + return {"success": False, "error": {"code": "NO_INSTANCE", "message": str(e)}} + + config = get_config() + cap = config.resource_caps.get("symbols", 1000) + response = self.safe_get(port, "symbols/exports", {"limit": cap}) + simplified = self.simplify_response(response) + + if not simplified.get("success", True): + return simplified + + exports = simplified.get("result", []) + if not isinstance(exports, list): + exports = [] + + query_params = {"tool": "symbols_exports", "port": port, "grep": grep} + session_id = self._get_session_id(ctx) + + return self.filtered_paginate( + data=exports, + query_params=query_params, + tool_name="symbols_exports", + session_id=session_id, + page_size=min(page_size, config.max_page_size), + grep=grep, + grep_ignorecase=grep_ignorecase, + return_all=return_all, + fields=fields, + ) + + @mcp_tool() + def symbols_create( + self, + name: str, + address: str, + port: Optional[int] = None, + ) -> Dict[str, Any]: + """Create a new label/symbol at the specified address. + + Args: + name: Name for the new symbol + address: Memory address in hex format + port: Ghidra instance port (optional) + + Returns: + Created symbol information + """ + if not name: + return { + "success": False, + "error": { + "code": "MISSING_PARAMETER", + "message": "name parameter is required", + }, + } + if not address: + return { + "success": False, + "error": { + "code": "MISSING_PARAMETER", + "message": "address parameter is required", + }, + } + + try: + port = self.get_instance_port(port) + except ValueError as e: + return {"success": False, "error": {"code": "NO_INSTANCE", "message": str(e)}} + + payload = {"name": name, "address": address} + response = self.safe_post(port, "symbols", payload) + return self.simplify_response(response) + + @mcp_tool() + def symbols_rename( + self, + address: str, + new_name: str, + port: Optional[int] = None, + ) -> Dict[str, Any]: + """Rename the primary symbol at the specified address. + + Args: + address: Memory address of the symbol in hex format + new_name: New name for the symbol + port: Ghidra instance port (optional) + + Returns: + Operation result with old and new names + """ + if not address: + return { + "success": False, + "error": { + "code": "MISSING_PARAMETER", + "message": "address parameter is required", + }, + } + if not new_name: + return { + "success": False, + "error": { + "code": "MISSING_PARAMETER", + "message": "new_name parameter is required", + }, + } + + try: + port = self.get_instance_port(port) + except ValueError as e: + return {"success": False, "error": {"code": "NO_INSTANCE", "message": str(e)}} + + payload = {"name": new_name} + response = self.safe_patch(port, f"symbols/{address}", payload) + return self.simplify_response(response) + + @mcp_tool() + def symbols_delete( + self, + address: str, + port: Optional[int] = None, + ) -> Dict[str, Any]: + """Delete the primary symbol at the specified address. + + Args: + address: Memory address of the symbol in hex format + port: Ghidra instance port (optional) + + Returns: + Operation result + """ + if not address: + return { + "success": False, + "error": { + "code": "MISSING_PARAMETER", + "message": "address parameter is required", + }, + } + + try: + port = self.get_instance_port(port) + except ValueError as e: + return {"success": False, "error": {"code": "NO_INSTANCE", "message": str(e)}} + + response = self.safe_delete(port, f"symbols/{address}") + return self.simplify_response(response) + + # Resources + + @mcp_resource(uri="ghidra://instance/{port}/symbols") + def resource_symbols_list(self, port: Optional[int] = None) -> Dict[str, Any]: + """MCP Resource: List symbols (capped). + + Args: + port: Ghidra instance port + + Returns: + List of symbols (capped) + """ + try: + port = self.get_instance_port(port) + except ValueError as e: + return {"error": str(e)} + + config = get_config() + cap = config.resource_caps.get("symbols", 1000) + + response = self.safe_get(port, "symbols", {"limit": cap}) + simplified = self.simplify_response(response) + + if not simplified.get("success", True): + return simplified + + symbols = simplified.get("result", []) + if not isinstance(symbols, list): + symbols = [] + + return { + "symbols": symbols[:cap], + "count": len(symbols), + "capped_at": cap if len(symbols) >= cap else None, + "_hint": "Use symbols_list() tool for full pagination" if len(symbols) >= cap else None, + } + + @mcp_resource(uri="ghidra://instance/{port}/symbols/imports") + def resource_symbols_imports(self, port: Optional[int] = None) -> Dict[str, Any]: + """MCP Resource: List imported symbols (capped). + + Args: + port: Ghidra instance port + + Returns: + List of imported symbols (capped) + """ + try: + port = self.get_instance_port(port) + except ValueError as e: + return {"error": str(e)} + + config = get_config() + cap = config.resource_caps.get("symbols", 1000) + + response = self.safe_get(port, "symbols/imports", {"limit": cap}) + simplified = self.simplify_response(response) + + if not simplified.get("success", True): + return simplified + + imports = simplified.get("result", []) + if not isinstance(imports, list): + imports = [] + + return { + "imports": imports[:cap], + "count": len(imports), + "capped_at": cap if len(imports) >= cap else None, + "_hint": "Use symbols_imports() tool for full pagination" + if len(imports) >= cap + else None, + } + + @mcp_resource(uri="ghidra://instance/{port}/symbols/exports") + def resource_symbols_exports(self, port: Optional[int] = None) -> Dict[str, Any]: + """MCP Resource: List exported symbols (capped). + + Args: + port: Ghidra instance port + + Returns: + List of exported symbols (capped) + """ + try: + port = self.get_instance_port(port) + except ValueError as e: + return {"error": str(e)} + + config = get_config() + cap = config.resource_caps.get("symbols", 1000) + + response = self.safe_get(port, "symbols/exports", {"limit": cap}) + simplified = self.simplify_response(response) + + if not simplified.get("success", True): + return simplified + + exports = simplified.get("result", []) + if not isinstance(exports, list): + exports = [] + + return { + "exports": exports[:cap], + "count": len(exports), + "capped_at": cap if len(exports) >= cap else None, + "_hint": "Use symbols_exports() tool for full pagination" + if len(exports) >= cap + else None, + } diff --git a/src/ghydramcp/mixins/variables.py b/src/ghydramcp/mixins/variables.py new file mode 100644 index 0000000..a8c7707 --- /dev/null +++ b/src/ghydramcp/mixins/variables.py @@ -0,0 +1,261 @@ +"""Variables mixin for GhydraMCP. + +Provides tools for querying global and function-local variables. +""" + +from typing import Any, Dict, List, Optional + +from fastmcp import Context +from fastmcp.contrib.mcp_mixin import mcp_resource, mcp_tool + +from ..config import get_config +from .base import GhydraMixinBase + + +class VariablesMixin(GhydraMixinBase): + """Mixin for variable operations. + + Provides tools for: + - Listing global and function variables + - Querying local variables and parameters for a specific function + """ + + @mcp_tool() + def variables_list( + self, + global_only: bool = False, + port: Optional[int] = None, + page_size: int = 50, + grep: Optional[str] = None, + grep_ignorecase: bool = True, + return_all: bool = False, + fields: Optional[List[str]] = None, + ctx: Optional[Context] = None, + ) -> Dict[str, Any]: + """List variables with cursor-based pagination. + + Args: + global_only: Only return global variables (default: False) + port: Ghidra instance port (optional) + page_size: Variables per page (default: 50, max: 500) + grep: Regex pattern to filter variable names + grep_ignorecase: Case-insensitive grep (default: True) + return_all: Return all variables without pagination + fields: Field names to keep (e.g. ['name', 'type', 'address']). Reduces response size. + ctx: FastMCP context (auto-injected) + + Returns: + Paginated list of variables + """ + try: + port = self.get_instance_port(port) + except ValueError as e: + return {"success": False, "error": {"code": "NO_INSTANCE", "message": str(e)}} + + config = get_config() + cap = config.resource_caps.get("variables", 1000) + params = {"limit": cap} + if global_only: + params["global_only"] = "true" + + response = self.safe_get(port, "variables", params) + simplified = self.simplify_response(response) + + if not simplified.get("success", True): + return simplified + + variables = simplified.get("result", []) + if not isinstance(variables, list): + variables = [] + + query_params = { + "tool": "variables_list", + "port": port, + "global_only": global_only, + "grep": grep, + } + session_id = self._get_session_id(ctx) + + return self.filtered_paginate( + data=variables, + query_params=query_params, + tool_name="variables_list", + session_id=session_id, + page_size=min(page_size, config.max_page_size), + grep=grep, + grep_ignorecase=grep_ignorecase, + return_all=return_all, + fields=fields, + ) + + @mcp_tool() + def functions_variables( + self, + address: str, + port: Optional[int] = None, + page_size: int = 50, + grep: Optional[str] = None, + grep_ignorecase: bool = True, + return_all: bool = False, + fields: Optional[List[str]] = None, + ctx: Optional[Context] = None, + ) -> Dict[str, Any]: + """List local variables and parameters for a specific function. + + Args: + address: Function address in hex format + port: Ghidra instance port (optional) + page_size: Variables per page (default: 50, max: 500) + grep: Regex pattern to filter variable names + grep_ignorecase: Case-insensitive grep (default: True) + return_all: Return all variables without pagination + fields: Field names to keep (e.g. ['name', 'type', 'storage']). Reduces response size. + ctx: FastMCP context (auto-injected) + + Returns: + Paginated list of function variables + """ + if not address: + return { + "success": False, + "error": { + "code": "MISSING_PARAMETER", + "message": "Address parameter is required", + }, + } + + try: + port = self.get_instance_port(port) + except ValueError as e: + return {"success": False, "error": {"code": "NO_INSTANCE", "message": str(e)}} + + config = get_config() + response = self.safe_get(port, f"functions/{address}/variables") + simplified = self.simplify_response(response) + + if not simplified.get("success", True): + return simplified + + variables = simplified.get("result", []) + if not isinstance(variables, list): + variables = [] + + query_params = { + "tool": "functions_variables", + "port": port, + "address": address, + "grep": grep, + } + session_id = self._get_session_id(ctx) + + return self.filtered_paginate( + data=variables, + query_params=query_params, + tool_name="functions_variables", + session_id=session_id, + page_size=min(page_size, config.max_page_size), + grep=grep, + grep_ignorecase=grep_ignorecase, + return_all=return_all, + fields=fields, + ) + + @mcp_tool() + def variables_rename( + self, + function_address: str, + variable_name: str, + new_name: str, + new_type: Optional[str] = None, + port: Optional[int] = None, + ) -> Dict[str, Any]: + """Rename a variable (and optionally retype) within a function. + + Args: + function_address: Function address in hex format + variable_name: Current name of the variable + new_name: New name for the variable + new_type: New data type (optional, e.g. "int", "char*") + port: Ghidra instance port (optional) + + Returns: + Operation result + """ + if not function_address: + return { + "success": False, + "error": { + "code": "MISSING_PARAMETER", + "message": "function_address parameter is required", + }, + } + if not variable_name: + return { + "success": False, + "error": { + "code": "MISSING_PARAMETER", + "message": "variable_name parameter is required", + }, + } + if not new_name: + return { + "success": False, + "error": { + "code": "MISSING_PARAMETER", + "message": "new_name parameter is required", + }, + } + + try: + port = self.get_instance_port(port) + except ValueError as e: + return {"success": False, "error": {"code": "NO_INSTANCE", "message": str(e)}} + + from urllib.parse import quote + + payload: dict = {"name": new_name} + if new_type: + payload["data_type"] = new_type + + endpoint = f"functions/{function_address}/variables/{quote(variable_name)}" + response = self.safe_patch(port, endpoint, payload) + return self.simplify_response(response) + + # Resources + + @mcp_resource(uri="ghidra://instance/{port}/variables") + def resource_variables_list(self, port: Optional[int] = None) -> Dict[str, Any]: + """MCP Resource: List variables (capped). + + Args: + port: Ghidra instance port + + Returns: + List of variables (capped) + """ + try: + port = self.get_instance_port(port) + except ValueError as e: + return {"error": str(e)} + + config = get_config() + cap = config.resource_caps.get("variables", 1000) + + response = self.safe_get(port, "variables", {"limit": cap}) + simplified = self.simplify_response(response) + + if not simplified.get("success", True): + return simplified + + variables = simplified.get("result", []) + if not isinstance(variables, list): + variables = [] + + return { + "variables": variables[:cap], + "count": len(variables), + "capped_at": cap if len(variables) >= cap else None, + "_hint": "Use variables_list() tool for full pagination" + if len(variables) >= cap + else None, + } diff --git a/src/ghydramcp/mixins/xrefs.py b/src/ghydramcp/mixins/xrefs.py index 04bbe33..5d009a3 100644 --- a/src/ghydramcp/mixins/xrefs.py +++ b/src/ghydramcp/mixins/xrefs.py @@ -6,10 +6,10 @@ Provides tools for cross-reference (xref) operations. from typing import Any, Dict, List, Optional from fastmcp import Context -from fastmcp.contrib.mcp_mixin import mcp_tool, mcp_resource +from fastmcp.contrib.mcp_mixin import mcp_resource, mcp_tool -from .base import GhydraMixinBase from ..config import get_config +from .base import GhydraMixinBase class XrefsMixin(GhydraMixinBase): diff --git a/src/ghydramcp/server.py b/src/ghydramcp/server.py index b0e31f7..959e4b1 100644 --- a/src/ghydramcp/server.py +++ b/src/ghydramcp/server.py @@ -13,17 +13,23 @@ from typing import Optional from fastmcp import FastMCP -from .config import get_config, set_config, GhydraConfig +from .config import GhydraConfig, get_config, set_config from .mixins import ( - InstancesMixin, - FunctionsMixin, - DataMixin, - StructsMixin, AnalysisMixin, - MemoryMixin, - XrefsMixin, + BookmarksMixin, CursorsMixin, + DataMixin, + DataTypesMixin, DockerMixin, + FunctionsMixin, + InstancesMixin, + MemoryMixin, + NamespacesMixin, + SegmentsMixin, + StructsMixin, + SymbolsMixin, + VariablesMixin, + XrefsMixin, ) @@ -56,6 +62,12 @@ def create_server( xrefs_mixin = XrefsMixin() cursors_mixin = CursorsMixin() docker_mixin = DockerMixin() + symbols_mixin = SymbolsMixin() + segments_mixin = SegmentsMixin() + variables_mixin = VariablesMixin() + namespaces_mixin = NamespacesMixin() + bookmarks_mixin = BookmarksMixin() + datatypes_mixin = DataTypesMixin() # Register all mixins with the server # Each mixin registers its tools, resources, and prompts @@ -68,6 +80,12 @@ def create_server( xrefs_mixin.register_all(mcp) cursors_mixin.register_all(mcp) docker_mixin.register_all(mcp) + symbols_mixin.register_all(mcp) + segments_mixin.register_all(mcp) + variables_mixin.register_all(mcp) + namespaces_mixin.register_all(mcp) + bookmarks_mixin.register_all(mcp) + datatypes_mixin.register_all(mcp) # Optional feedback collection cfg = get_config() @@ -87,30 +105,41 @@ def create_server( def _periodic_discovery(interval: int = 30): """Background thread for periodic instance discovery. + Uses a short timeout per port so a full scan completes quickly + even when most ports are unreachable. + Args: interval: Seconds between discovery attempts """ + import requests as _requests + from .mixins.base import GhydraMixinBase - from .core.http_client import safe_get config = get_config() while True: time.sleep(interval) try: - # Quick scan of common ports + # Quick scan — use discovery_timeout (0.5s), NOT request_timeout (30s) for port in config.quick_discovery_range: try: - response = safe_get(port, "") - if response.get("success", False): - with GhydraMixinBase._instances_lock: - if port not in GhydraMixinBase._instances: - GhydraMixinBase._instances[port] = { - "url": f"http://{config.ghidra_host}:{port}", - "project": response.get("project", ""), - "file": response.get("file", ""), - "discovered_at": time.time(), - } + url = f"http://{config.ghidra_host}:{port}/" + resp = _requests.get( + url, + timeout=config.discovery_timeout, + headers={"Accept": "application/json"}, + ) + if resp.ok: + response = resp.json() + if response.get("success", False): + with GhydraMixinBase._instances_lock: + if port not in GhydraMixinBase._instances: + GhydraMixinBase._instances[port] = { + "url": url.rstrip("/"), + "project": response.get("project", ""), + "file": response.get("file", ""), + "discovered_at": time.time(), + } except Exception: pass except Exception: @@ -154,8 +183,8 @@ def main(): # Initial instance discovery print(f" Discovering Ghidra instances on {config.ghidra_host}...", file=sys.stderr) - from .mixins.base import GhydraMixinBase from .core.http_client import safe_get + from .mixins.base import GhydraMixinBase found = 0 for port in config.quick_discovery_range: