diff --git a/src/ghydramcp/config.py b/src/ghydramcp/config.py index 85076a2..50f3115 100644 --- a/src/ghydramcp/config.py +++ b/src/ghydramcp/config.py @@ -97,6 +97,11 @@ class GhydraConfig: "data": 1000, "structs": 500, "xrefs": 500, + "symbols": 1000, + "segments": 500, + "variables": 1000, + "namespaces": 500, + "classes": 500, }) def __post_init__(self): diff --git a/src/ghydramcp/core/__init__.py b/src/ghydramcp/core/__init__.py index 74c6cec..72901c8 100644 --- a/src/ghydramcp/core/__init__.py +++ b/src/ghydramcp/core/__init__.py @@ -3,38 +3,38 @@ Contains HTTP client, pagination, progress reporting, and logging utilities. """ +from .filtering import ( + apply_grep, + estimate_and_guard, + project_fields, +) from .http_client import ( + get_instance_url, + safe_delete, safe_get, + safe_patch, safe_post, safe_put, - safe_patch, - safe_delete, simplify_response, - get_instance_url, +) +from .logging import ( + log_debug, + log_error, + log_info, + log_warning, ) from .pagination import ( CursorManager, CursorState, - paginate_response, - get_cursor_manager, estimate_tokens, + get_cursor_manager, + paginate_response, ) from .progress import ( ProgressReporter, report_progress, report_step, ) -from .filtering import ( - project_fields, - apply_grep, - estimate_and_guard, -) -from .logging import ( - log_info, - log_debug, - log_warning, - log_error, -) __all__ = [ # HTTP client diff --git a/src/ghydramcp/core/filtering.py b/src/ghydramcp/core/filtering.py index f735881..e845fa6 100644 --- a/src/ghydramcp/core/filtering.py +++ b/src/ghydramcp/core/filtering.py @@ -11,7 +11,6 @@ from typing import Any, Dict, Optional from ..config import get_config - # Token estimation (same ratio as pagination.py) TOKEN_ESTIMATION_RATIO = 4.0 diff --git a/src/ghydramcp/core/http_client.py b/src/ghydramcp/core/http_client.py index 08b1f81..b9c1620 100644 --- a/src/ghydramcp/core/http_client.py +++ b/src/ghydramcp/core/http_client.py @@ -12,7 +12,6 @@ import requests from ..config import get_config - # Allowed origins for CORS-like validation ALLOWED_ORIGINS = { "http://localhost", diff --git a/src/ghydramcp/core/logging.py b/src/ghydramcp/core/logging.py index b595472..3666303 100644 --- a/src/ghydramcp/core/logging.py +++ b/src/ghydramcp/core/logging.py @@ -5,7 +5,7 @@ client-visible logging when available, with fallback to standard logging. """ import logging -from typing import Optional, TYPE_CHECKING +from typing import TYPE_CHECKING, Optional if TYPE_CHECKING: from mcp.server.fastmcp import Context diff --git a/src/ghydramcp/core/pagination.py b/src/ghydramcp/core/pagination.py index 2eea804..3af1d47 100644 --- a/src/ghydramcp/core/pagination.py +++ b/src/ghydramcp/core/pagination.py @@ -14,8 +14,7 @@ from threading import Lock from typing import Any, Dict, List, Optional, Tuple from ..config import get_config -from .filtering import project_fields, estimate_and_guard - +from .filtering import estimate_and_guard, project_fields # ReDoS Protection Configuration MAX_GREP_PATTERN_LENGTH = 500 diff --git a/src/ghydramcp/core/progress.py b/src/ghydramcp/core/progress.py index 5a63cd8..0f5d624 100644 --- a/src/ghydramcp/core/progress.py +++ b/src/ghydramcp/core/progress.py @@ -4,7 +4,7 @@ Provides async progress reporting using FastMCP's Context for real-time progress notifications to MCP clients. """ -from typing import Optional, TYPE_CHECKING +from typing import TYPE_CHECKING, Optional if TYPE_CHECKING: from mcp.server.fastmcp import Context diff --git a/src/ghydramcp/mixins/__init__.py b/src/ghydramcp/mixins/__init__.py index 919112b..4a2530c 100644 --- a/src/ghydramcp/mixins/__init__.py +++ b/src/ghydramcp/mixins/__init__.py @@ -4,16 +4,20 @@ Domain-specific mixins that organize tools, resources, and prompts by functional Uses FastMCP's contrib.mcp_mixin pattern for clean modular organization. """ -from .base import GhydraMixinBase -from .instances import InstancesMixin -from .functions import FunctionsMixin -from .data import DataMixin -from .structs import StructsMixin from .analysis import AnalysisMixin -from .memory import MemoryMixin -from .xrefs import XrefsMixin +from .base import GhydraMixinBase from .cursors import CursorsMixin +from .data import DataMixin from .docker import DockerMixin +from .functions import FunctionsMixin +from .instances import InstancesMixin +from .memory import MemoryMixin +from .namespaces import NamespacesMixin +from .segments import SegmentsMixin +from .structs import StructsMixin +from .symbols import SymbolsMixin +from .variables import VariablesMixin +from .xrefs import XrefsMixin __all__ = [ "GhydraMixinBase", @@ -26,4 +30,8 @@ __all__ = [ "XrefsMixin", "CursorsMixin", "DockerMixin", + "SymbolsMixin", + "SegmentsMixin", + "VariablesMixin", + "NamespacesMixin", ] diff --git a/src/ghydramcp/mixins/analysis.py b/src/ghydramcp/mixins/analysis.py index 1af360a..fef6055 100644 --- a/src/ghydramcp/mixins/analysis.py +++ b/src/ghydramcp/mixins/analysis.py @@ -8,8 +8,8 @@ from typing import Any, Dict, List, Optional from fastmcp import Context from fastmcp.contrib.mcp_mixin import mcp_tool -from .base import GhydraMixinBase from ..config import get_config +from .base import GhydraMixinBase class AnalysisMixin(GhydraMixinBase): @@ -277,6 +277,40 @@ class AnalysisMixin(GhydraMixinBase): response = self.safe_get(port, "function") return self.simplify_response(response) + @mcp_tool() + def comments_get( + self, + address: str, + comment_type: str = "plate", + port: Optional[int] = None, + ) -> Dict[str, Any]: + """Get a comment at the specified address. + + Args: + address: Memory address in hex format + comment_type: "plate", "pre", "post", "eol", "repeatable" + port: Ghidra instance port (optional) + + Returns: + Comment text and metadata + """ + if not address: + return { + "success": False, + "error": { + "code": "MISSING_PARAMETER", + "message": "Address parameter is required", + }, + } + + try: + port = self.get_instance_port(port) + except ValueError as e: + return {"success": False, "error": {"code": "NO_INSTANCE", "message": str(e)}} + + response = self.safe_get(port, f"memory/{address}/comments/{comment_type}") + return self.simplify_response(response) + @mcp_tool() def comments_set( self, diff --git a/src/ghydramcp/mixins/base.py b/src/ghydramcp/mixins/base.py index 6bbf048..e191026 100644 --- a/src/ghydramcp/mixins/base.py +++ b/src/ghydramcp/mixins/base.py @@ -11,9 +11,16 @@ from fastmcp import Context from fastmcp.contrib.mcp_mixin import MCPMixin from ..config import get_config -from ..core.http_client import safe_get, safe_post, safe_put, safe_patch, safe_delete, simplify_response +from ..core.http_client import ( + safe_delete, + safe_get, + safe_patch, + safe_post, + safe_put, + simplify_response, +) +from ..core.logging import log_debug, log_error, log_info, log_warning from ..core.pagination import paginate_response -from ..core.logging import log_info, log_debug, log_warning, log_error class GhydraMixinBase(MCPMixin): diff --git a/src/ghydramcp/mixins/cursors.py b/src/ghydramcp/mixins/cursors.py index a1cb68c..0f4324e 100644 --- a/src/ghydramcp/mixins/cursors.py +++ b/src/ghydramcp/mixins/cursors.py @@ -8,8 +8,8 @@ from typing import Any, Dict, Optional from fastmcp import Context from fastmcp.contrib.mcp_mixin import mcp_tool -from .base import GhydraMixinBase from ..core.pagination import get_cursor_manager +from .base import GhydraMixinBase class CursorsMixin(GhydraMixinBase): diff --git a/src/ghydramcp/mixins/data.py b/src/ghydramcp/mixins/data.py index dde51ec..7c43c5c 100644 --- a/src/ghydramcp/mixins/data.py +++ b/src/ghydramcp/mixins/data.py @@ -6,10 +6,10 @@ Provides tools for data items and strings operations. from typing import Any, Dict, List, Optional from fastmcp import Context -from fastmcp.contrib.mcp_mixin import mcp_tool, mcp_resource +from fastmcp.contrib.mcp_mixin import mcp_resource, mcp_tool -from .base import GhydraMixinBase from ..config import get_config +from .base import GhydraMixinBase class DataMixin(GhydraMixinBase): diff --git a/src/ghydramcp/mixins/docker.py b/src/ghydramcp/mixins/docker.py index 51add07..243e165 100644 --- a/src/ghydramcp/mixins/docker.py +++ b/src/ghydramcp/mixins/docker.py @@ -16,14 +16,11 @@ import subprocess import time import uuid from pathlib import Path -from typing import Any, Dict, List, Optional, Set +from typing import Any, Dict, List, Optional from fastmcp import Context from fastmcp.contrib.mcp_mixin import MCPMixin, mcp_tool -from ..config import get_config, get_docker_config - - # Port pool configuration PORT_POOL_START = 8192 PORT_POOL_END = 8199 @@ -823,9 +820,9 @@ class DockerMixin(MCPMixin): Returns: Health status and API info if available """ - import urllib.request - import urllib.error import json as json_module + import urllib.error + import urllib.request url = f"http://localhost:{port}/" diff --git a/src/ghydramcp/mixins/functions.py b/src/ghydramcp/mixins/functions.py index 3eb129b..94914dd 100644 --- a/src/ghydramcp/mixins/functions.py +++ b/src/ghydramcp/mixins/functions.py @@ -7,10 +7,10 @@ from typing import Any, Dict, List, Optional from urllib.parse import quote from fastmcp import Context -from fastmcp.contrib.mcp_mixin import mcp_tool, mcp_resource +from fastmcp.contrib.mcp_mixin import mcp_resource, mcp_tool -from .base import GhydraMixinBase from ..config import get_config +from .base import GhydraMixinBase class FunctionsMixin(GhydraMixinBase): @@ -28,6 +28,9 @@ class FunctionsMixin(GhydraMixinBase): @mcp_tool() def functions_list( self, + name_contains: Optional[str] = None, + name_regex: Optional[str] = None, + address: Optional[str] = None, port: Optional[int] = None, page_size: int = 50, grep: Optional[str] = None, @@ -36,12 +39,15 @@ class FunctionsMixin(GhydraMixinBase): fields: Optional[List[str]] = None, ctx: Optional[Context] = None, ) -> Dict[str, Any]: - """List functions with cursor-based pagination. + """List functions with cursor-based pagination and server-side filtering. Args: + name_contains: Server-side substring filter on function name (faster than grep for large binaries) + name_regex: Server-side regex filter on function name + address: Filter by exact function address (hex) port: Ghidra instance port (optional) page_size: Functions per page (default: 50, max: 500) - grep: Regex pattern to filter function names + grep: Client-side regex pattern to filter function names grep_ignorecase: Case-insensitive grep (default: True) return_all: Return all functions without pagination fields: Field names to keep (e.g. ['name', 'address']). Reduces response size. @@ -56,7 +62,15 @@ class FunctionsMixin(GhydraMixinBase): return {"success": False, "error": {"code": "NO_INSTANCE", "message": str(e)}} config = get_config() - response = self.safe_get(port, "functions", {"limit": 10000}) + params = {"limit": 10000} + if name_contains: + params["name_contains"] = name_contains + if name_regex: + params["name_matches_regex"] = name_regex + if address: + params["addr"] = address + + response = self.safe_get(port, "functions", params) simplified = self.simplify_response(response) if not simplified.get("success", True): @@ -66,7 +80,14 @@ class FunctionsMixin(GhydraMixinBase): if not isinstance(functions, list): functions = [] - query_params = {"tool": "functions_list", "port": port, "grep": grep} + query_params = { + "tool": "functions_list", + "port": port, + "name_contains": name_contains, + "name_regex": name_regex, + "address": address, + "grep": grep, + } session_id = self._get_session_id(ctx) return self.filtered_paginate( @@ -467,7 +488,7 @@ class FunctionsMixin(GhydraMixinBase): "functions": functions[:cap], "count": len(functions), "capped_at": cap if len(functions) >= cap else None, - "_hint": f"Use functions_list() tool for full pagination" if len(functions) >= cap else None, + "_hint": "Use functions_list() tool for full pagination" if len(functions) >= cap else None, } @mcp_resource(uri="ghidra://instance/{port}/function/decompile/address/{address}") diff --git a/src/ghydramcp/mixins/instances.py b/src/ghydramcp/mixins/instances.py index e92f4d4..51a3df2 100644 --- a/src/ghydramcp/mixins/instances.py +++ b/src/ghydramcp/mixins/instances.py @@ -4,13 +4,12 @@ Provides tools for discovering, registering, and managing Ghidra instances. """ import time -from typing import Any, Dict, List, Optional +from typing import Any, Dict, Optional -from fastmcp.contrib.mcp_mixin import mcp_tool, mcp_resource +from fastmcp.contrib.mcp_mixin import mcp_resource, mcp_tool -from .base import GhydraMixinBase from ..config import get_config -from ..core.http_client import safe_get +from .base import GhydraMixinBase class InstancesMixin(GhydraMixinBase): @@ -211,6 +210,25 @@ class InstancesMixin(GhydraMixinBase): return {"port": port, "status": "registered but no details available"} + @mcp_tool() + def program_info(self, port: Optional[int] = None) -> Dict[str, Any]: + """Get full program metadata (architecture, language, compiler, image base, memory size). + + Args: + port: Ghidra instance port (optional) + + Returns: + Program metadata including architecture, language, compiler spec, + image base address, and total memory size + """ + try: + port = self.get_instance_port(port) + except ValueError as e: + return {"success": False, "error": {"code": "NO_INSTANCE", "message": str(e)}} + + response = self.safe_get(port, "program") + return self.simplify_response(response) + @mcp_resource(uri="ghidra://instances") def resource_instances_list(self) -> Dict[str, Any]: """MCP Resource: List all active Ghidra instances. @@ -297,3 +315,21 @@ class InstancesMixin(GhydraMixinBase): "string_count": string_count, "port": port, } + + @mcp_resource(uri="ghidra://instance/{port}/program") + def resource_program_info(self, port: Optional[int] = None) -> Dict[str, Any]: + """MCP Resource: Get program metadata for a Ghidra instance. + + Args: + port: Ghidra instance port + + Returns: + Program metadata (architecture, language, compiler, image base) + """ + try: + port = self.get_instance_port(port) + except ValueError as e: + return {"error": str(e)} + + response = self.safe_get(port, "program") + return self.simplify_response(response) diff --git a/src/ghydramcp/mixins/namespaces.py b/src/ghydramcp/mixins/namespaces.py new file mode 100644 index 0000000..a298ae1 --- /dev/null +++ b/src/ghydramcp/mixins/namespaces.py @@ -0,0 +1,211 @@ +"""Namespaces mixin for GhydraMCP. + +Provides tools for querying namespaces and class definitions. +""" + +from typing import Any, Dict, List, Optional + +from fastmcp import Context +from fastmcp.contrib.mcp_mixin import mcp_resource, mcp_tool + +from ..config import get_config +from .base import GhydraMixinBase + + +class NamespacesMixin(GhydraMixinBase): + """Mixin for namespace and class operations. + + Provides tools for: + - Listing all non-global namespaces + - Listing class namespaces with qualified names + """ + + @mcp_tool() + def namespaces_list( + self, + port: Optional[int] = None, + page_size: int = 50, + grep: Optional[str] = None, + grep_ignorecase: bool = True, + return_all: bool = False, + fields: Optional[List[str]] = None, + ctx: Optional[Context] = None, + ) -> Dict[str, Any]: + """List all non-global namespaces with pagination. + + Args: + port: Ghidra instance port (optional) + page_size: Namespaces per page (default: 50, max: 500) + grep: Regex pattern to filter namespace names + grep_ignorecase: Case-insensitive grep (default: True) + return_all: Return all namespaces without pagination + fields: Field names to keep (e.g. ['name', 'id']). Reduces response size. + ctx: FastMCP context (auto-injected) + + Returns: + Paginated list of namespaces + """ + try: + port = self.get_instance_port(port) + except ValueError as e: + return {"success": False, "error": {"code": "NO_INSTANCE", "message": str(e)}} + + config = get_config() + cap = config.resource_caps.get("namespaces", 500) + response = self.safe_get(port, "namespaces", {"limit": cap}) + simplified = self.simplify_response(response) + + if not simplified.get("success", True): + return simplified + + namespaces = simplified.get("result", []) + if not isinstance(namespaces, list): + namespaces = [] + + query_params = {"tool": "namespaces_list", "port": port, "grep": grep} + session_id = self._get_session_id(ctx) + + return self.filtered_paginate( + data=namespaces, + query_params=query_params, + tool_name="namespaces_list", + session_id=session_id, + page_size=min(page_size, config.max_page_size), + grep=grep, + grep_ignorecase=grep_ignorecase, + return_all=return_all, + fields=fields, + ) + + @mcp_tool() + def classes_list( + self, + port: Optional[int] = None, + page_size: int = 50, + grep: Optional[str] = None, + grep_ignorecase: bool = True, + return_all: bool = False, + fields: Optional[List[str]] = None, + ctx: Optional[Context] = None, + ) -> Dict[str, Any]: + """List class namespaces with qualified names. + + Args: + port: Ghidra instance port (optional) + page_size: Classes per page (default: 50, max: 500) + grep: Regex pattern to filter class names + grep_ignorecase: Case-insensitive grep (default: True) + return_all: Return all classes without pagination + fields: Field names to keep (e.g. ['name', 'qualified_name']). Reduces response size. + ctx: FastMCP context (auto-injected) + + Returns: + Paginated list of class namespaces + """ + try: + port = self.get_instance_port(port) + except ValueError as e: + return {"success": False, "error": {"code": "NO_INSTANCE", "message": str(e)}} + + config = get_config() + cap = config.resource_caps.get("classes", 500) + response = self.safe_get(port, "classes", {"limit": cap}) + simplified = self.simplify_response(response) + + if not simplified.get("success", True): + return simplified + + classes = simplified.get("result", []) + if not isinstance(classes, list): + classes = [] + + query_params = {"tool": "classes_list", "port": port, "grep": grep} + session_id = self._get_session_id(ctx) + + return self.filtered_paginate( + data=classes, + query_params=query_params, + tool_name="classes_list", + session_id=session_id, + page_size=min(page_size, config.max_page_size), + grep=grep, + grep_ignorecase=grep_ignorecase, + return_all=return_all, + fields=fields, + ) + + # Resources + + @mcp_resource(uri="ghidra://instance/{port}/namespaces") + def resource_namespaces_list(self, port: Optional[int] = None) -> Dict[str, Any]: + """MCP Resource: List namespaces (capped). + + Args: + port: Ghidra instance port + + Returns: + List of namespaces (capped) + """ + try: + port = self.get_instance_port(port) + except ValueError as e: + return {"error": str(e)} + + config = get_config() + cap = config.resource_caps.get("namespaces", 500) + + response = self.safe_get(port, "namespaces", {"limit": cap}) + simplified = self.simplify_response(response) + + if not simplified.get("success", True): + return simplified + + namespaces = simplified.get("result", []) + if not isinstance(namespaces, list): + namespaces = [] + + return { + "namespaces": namespaces[:cap], + "count": len(namespaces), + "capped_at": cap if len(namespaces) >= cap else None, + "_hint": "Use namespaces_list() tool for full pagination" + if len(namespaces) >= cap + else None, + } + + @mcp_resource(uri="ghidra://instance/{port}/classes") + def resource_classes_list(self, port: Optional[int] = None) -> Dict[str, Any]: + """MCP Resource: List classes (capped). + + Args: + port: Ghidra instance port + + Returns: + List of class namespaces (capped) + """ + try: + port = self.get_instance_port(port) + except ValueError as e: + return {"error": str(e)} + + config = get_config() + cap = config.resource_caps.get("classes", 500) + + response = self.safe_get(port, "classes", {"limit": cap}) + simplified = self.simplify_response(response) + + if not simplified.get("success", True): + return simplified + + classes = simplified.get("result", []) + if not isinstance(classes, list): + classes = [] + + return { + "classes": classes[:cap], + "count": len(classes), + "capped_at": cap if len(classes) >= cap else None, + "_hint": "Use classes_list() tool for full pagination" + if len(classes) >= cap + else None, + } diff --git a/src/ghydramcp/mixins/segments.py b/src/ghydramcp/mixins/segments.py new file mode 100644 index 0000000..9082cb6 --- /dev/null +++ b/src/ghydramcp/mixins/segments.py @@ -0,0 +1,122 @@ +"""Segments mixin for GhydraMCP. + +Provides tools for querying memory segments (sections) and their permissions. +""" + +from typing import Any, Dict, List, Optional + +from fastmcp import Context +from fastmcp.contrib.mcp_mixin import mcp_resource, mcp_tool + +from ..config import get_config +from .base import GhydraMixinBase + + +class SegmentsMixin(GhydraMixinBase): + """Mixin for memory segment operations. + + Provides tools for: + - Listing memory segments with permissions and size info + """ + + @mcp_tool() + def segments_list( + self, + name: Optional[str] = None, + port: Optional[int] = None, + page_size: int = 50, + grep: Optional[str] = None, + grep_ignorecase: bool = True, + return_all: bool = False, + fields: Optional[List[str]] = None, + ctx: Optional[Context] = None, + ) -> Dict[str, Any]: + """List memory segments with R/W/X permissions and size info. + + Args: + name: Filter by segment name (server-side, exact match) + port: Ghidra instance port (optional) + page_size: Segments per page (default: 50, max: 500) + grep: Regex pattern to filter segment names + grep_ignorecase: Case-insensitive grep (default: True) + return_all: Return all segments without pagination + fields: Field names to keep (e.g. ['name', 'start', 'permissions']). Reduces response size. + ctx: FastMCP context (auto-injected) + + Returns: + Paginated list of memory segments + """ + try: + port = self.get_instance_port(port) + except ValueError as e: + return {"success": False, "error": {"code": "NO_INSTANCE", "message": str(e)}} + + config = get_config() + cap = config.resource_caps.get("segments", 500) + params = {"limit": cap} + if name: + params["name"] = name + + response = self.safe_get(port, "segments", params) + simplified = self.simplify_response(response) + + if not simplified.get("success", True): + return simplified + + segments = simplified.get("result", []) + if not isinstance(segments, list): + segments = [] + + query_params = {"tool": "segments_list", "port": port, "name": name, "grep": grep} + session_id = self._get_session_id(ctx) + + return self.filtered_paginate( + data=segments, + query_params=query_params, + tool_name="segments_list", + session_id=session_id, + page_size=min(page_size, config.max_page_size), + grep=grep, + grep_ignorecase=grep_ignorecase, + return_all=return_all, + fields=fields, + ) + + # Resources + + @mcp_resource(uri="ghidra://instance/{port}/segments") + def resource_segments_list(self, port: Optional[int] = None) -> Dict[str, Any]: + """MCP Resource: List memory segments (capped). + + Args: + port: Ghidra instance port + + Returns: + List of memory segments (capped) + """ + try: + port = self.get_instance_port(port) + except ValueError as e: + return {"error": str(e)} + + config = get_config() + cap = config.resource_caps.get("segments", 500) + + response = self.safe_get(port, "segments", {"limit": cap}) + simplified = self.simplify_response(response) + + if not simplified.get("success", True): + return simplified + + segments = simplified.get("result", []) + if not isinstance(segments, list): + segments = [] + + return { + "segments": segments[:cap], + "count": len(segments), + "capped_at": cap if len(segments) >= cap else None, + "_hint": "Use segments_list() tool for full pagination" + if len(segments) >= cap + else None, + } diff --git a/src/ghydramcp/mixins/structs.py b/src/ghydramcp/mixins/structs.py index be427e1..1463ca8 100644 --- a/src/ghydramcp/mixins/structs.py +++ b/src/ghydramcp/mixins/structs.py @@ -6,10 +6,10 @@ Provides tools for struct data type operations. from typing import Any, Dict, List, Optional from fastmcp import Context -from fastmcp.contrib.mcp_mixin import mcp_tool, mcp_resource +from fastmcp.contrib.mcp_mixin import mcp_resource, mcp_tool -from .base import GhydraMixinBase from ..config import get_config +from .base import GhydraMixinBase class StructsMixin(GhydraMixinBase): diff --git a/src/ghydramcp/mixins/symbols.py b/src/ghydramcp/mixins/symbols.py new file mode 100644 index 0000000..568c293 --- /dev/null +++ b/src/ghydramcp/mixins/symbols.py @@ -0,0 +1,304 @@ +"""Symbols mixin for GhydraMCP. + +Provides tools for symbol table operations including labels, imports, and exports. +""" + +from typing import Any, Dict, List, Optional + +from fastmcp import Context +from fastmcp.contrib.mcp_mixin import mcp_resource, mcp_tool + +from ..config import get_config +from .base import GhydraMixinBase + + +class SymbolsMixin(GhydraMixinBase): + """Mixin for symbol table operations. + + Provides tools for: + - Listing all symbols with pagination + - Querying imported symbols (external references) + - Querying exported symbols (entry points) + """ + + @mcp_tool() + def symbols_list( + self, + port: Optional[int] = None, + page_size: int = 50, + grep: Optional[str] = None, + grep_ignorecase: bool = True, + return_all: bool = False, + fields: Optional[List[str]] = None, + ctx: Optional[Context] = None, + ) -> Dict[str, Any]: + """List symbols with cursor-based pagination. + + Args: + port: Ghidra instance port (optional) + page_size: Symbols per page (default: 50, max: 500) + grep: Regex pattern to filter symbol names + grep_ignorecase: Case-insensitive grep (default: True) + return_all: Return all symbols without pagination + fields: Field names to keep (e.g. ['name', 'address']). Reduces response size. + ctx: FastMCP context (auto-injected) + + Returns: + Paginated list of symbols + """ + try: + port = self.get_instance_port(port) + except ValueError as e: + return {"success": False, "error": {"code": "NO_INSTANCE", "message": str(e)}} + + config = get_config() + cap = config.resource_caps.get("symbols", 1000) + response = self.safe_get(port, "symbols", {"limit": cap}) + simplified = self.simplify_response(response) + + if not simplified.get("success", True): + return simplified + + symbols = simplified.get("result", []) + if not isinstance(symbols, list): + symbols = [] + + query_params = {"tool": "symbols_list", "port": port, "grep": grep} + session_id = self._get_session_id(ctx) + + return self.filtered_paginate( + data=symbols, + query_params=query_params, + tool_name="symbols_list", + session_id=session_id, + page_size=min(page_size, config.max_page_size), + grep=grep, + grep_ignorecase=grep_ignorecase, + return_all=return_all, + fields=fields, + ) + + @mcp_tool() + def symbols_imports( + self, + port: Optional[int] = None, + page_size: int = 50, + grep: Optional[str] = None, + grep_ignorecase: bool = True, + return_all: bool = False, + fields: Optional[List[str]] = None, + ctx: Optional[Context] = None, + ) -> Dict[str, Any]: + """List imported symbols (external references) with pagination. + + Args: + port: Ghidra instance port (optional) + page_size: Imports per page (default: 50, max: 500) + grep: Regex pattern to filter import names + grep_ignorecase: Case-insensitive grep (default: True) + return_all: Return all imports without pagination + fields: Field names to keep. Reduces response size. + ctx: FastMCP context (auto-injected) + + Returns: + Paginated list of imported symbols + """ + try: + port = self.get_instance_port(port) + except ValueError as e: + return {"success": False, "error": {"code": "NO_INSTANCE", "message": str(e)}} + + config = get_config() + cap = config.resource_caps.get("symbols", 1000) + response = self.safe_get(port, "symbols/imports", {"limit": cap}) + simplified = self.simplify_response(response) + + if not simplified.get("success", True): + return simplified + + imports = simplified.get("result", []) + if not isinstance(imports, list): + imports = [] + + query_params = {"tool": "symbols_imports", "port": port, "grep": grep} + session_id = self._get_session_id(ctx) + + return self.filtered_paginate( + data=imports, + query_params=query_params, + tool_name="symbols_imports", + session_id=session_id, + page_size=min(page_size, config.max_page_size), + grep=grep, + grep_ignorecase=grep_ignorecase, + return_all=return_all, + fields=fields, + ) + + @mcp_tool() + def symbols_exports( + self, + port: Optional[int] = None, + page_size: int = 50, + grep: Optional[str] = None, + grep_ignorecase: bool = True, + return_all: bool = False, + fields: Optional[List[str]] = None, + ctx: Optional[Context] = None, + ) -> Dict[str, Any]: + """List exported symbols (entry points) with pagination. + + Args: + port: Ghidra instance port (optional) + page_size: Exports per page (default: 50, max: 500) + grep: Regex pattern to filter export names + grep_ignorecase: Case-insensitive grep (default: True) + return_all: Return all exports without pagination + fields: Field names to keep. Reduces response size. + ctx: FastMCP context (auto-injected) + + Returns: + Paginated list of exported symbols + """ + try: + port = self.get_instance_port(port) + except ValueError as e: + return {"success": False, "error": {"code": "NO_INSTANCE", "message": str(e)}} + + config = get_config() + cap = config.resource_caps.get("symbols", 1000) + response = self.safe_get(port, "symbols/exports", {"limit": cap}) + simplified = self.simplify_response(response) + + if not simplified.get("success", True): + return simplified + + exports = simplified.get("result", []) + if not isinstance(exports, list): + exports = [] + + query_params = {"tool": "symbols_exports", "port": port, "grep": grep} + session_id = self._get_session_id(ctx) + + return self.filtered_paginate( + data=exports, + query_params=query_params, + tool_name="symbols_exports", + session_id=session_id, + page_size=min(page_size, config.max_page_size), + grep=grep, + grep_ignorecase=grep_ignorecase, + return_all=return_all, + fields=fields, + ) + + # Resources + + @mcp_resource(uri="ghidra://instance/{port}/symbols") + def resource_symbols_list(self, port: Optional[int] = None) -> Dict[str, Any]: + """MCP Resource: List symbols (capped). + + Args: + port: Ghidra instance port + + Returns: + List of symbols (capped) + """ + try: + port = self.get_instance_port(port) + except ValueError as e: + return {"error": str(e)} + + config = get_config() + cap = config.resource_caps.get("symbols", 1000) + + response = self.safe_get(port, "symbols", {"limit": cap}) + simplified = self.simplify_response(response) + + if not simplified.get("success", True): + return simplified + + symbols = simplified.get("result", []) + if not isinstance(symbols, list): + symbols = [] + + return { + "symbols": symbols[:cap], + "count": len(symbols), + "capped_at": cap if len(symbols) >= cap else None, + "_hint": "Use symbols_list() tool for full pagination" if len(symbols) >= cap else None, + } + + @mcp_resource(uri="ghidra://instance/{port}/symbols/imports") + def resource_symbols_imports(self, port: Optional[int] = None) -> Dict[str, Any]: + """MCP Resource: List imported symbols (capped). + + Args: + port: Ghidra instance port + + Returns: + List of imported symbols (capped) + """ + try: + port = self.get_instance_port(port) + except ValueError as e: + return {"error": str(e)} + + config = get_config() + cap = config.resource_caps.get("symbols", 1000) + + response = self.safe_get(port, "symbols/imports", {"limit": cap}) + simplified = self.simplify_response(response) + + if not simplified.get("success", True): + return simplified + + imports = simplified.get("result", []) + if not isinstance(imports, list): + imports = [] + + return { + "imports": imports[:cap], + "count": len(imports), + "capped_at": cap if len(imports) >= cap else None, + "_hint": "Use symbols_imports() tool for full pagination" + if len(imports) >= cap + else None, + } + + @mcp_resource(uri="ghidra://instance/{port}/symbols/exports") + def resource_symbols_exports(self, port: Optional[int] = None) -> Dict[str, Any]: + """MCP Resource: List exported symbols (capped). + + Args: + port: Ghidra instance port + + Returns: + List of exported symbols (capped) + """ + try: + port = self.get_instance_port(port) + except ValueError as e: + return {"error": str(e)} + + config = get_config() + cap = config.resource_caps.get("symbols", 1000) + + response = self.safe_get(port, "symbols/exports", {"limit": cap}) + simplified = self.simplify_response(response) + + if not simplified.get("success", True): + return simplified + + exports = simplified.get("result", []) + if not isinstance(exports, list): + exports = [] + + return { + "exports": exports[:cap], + "count": len(exports), + "capped_at": cap if len(exports) >= cap else None, + "_hint": "Use symbols_exports() tool for full pagination" + if len(exports) >= cap + else None, + } diff --git a/src/ghydramcp/mixins/variables.py b/src/ghydramcp/mixins/variables.py new file mode 100644 index 0000000..36bc7e0 --- /dev/null +++ b/src/ghydramcp/mixins/variables.py @@ -0,0 +1,200 @@ +"""Variables mixin for GhydraMCP. + +Provides tools for querying global and function-local variables. +""" + +from typing import Any, Dict, List, Optional + +from fastmcp import Context +from fastmcp.contrib.mcp_mixin import mcp_resource, mcp_tool + +from ..config import get_config +from .base import GhydraMixinBase + + +class VariablesMixin(GhydraMixinBase): + """Mixin for variable operations. + + Provides tools for: + - Listing global and function variables + - Querying local variables and parameters for a specific function + """ + + @mcp_tool() + def variables_list( + self, + global_only: bool = False, + port: Optional[int] = None, + page_size: int = 50, + grep: Optional[str] = None, + grep_ignorecase: bool = True, + return_all: bool = False, + fields: Optional[List[str]] = None, + ctx: Optional[Context] = None, + ) -> Dict[str, Any]: + """List variables with cursor-based pagination. + + Args: + global_only: Only return global variables (default: False) + port: Ghidra instance port (optional) + page_size: Variables per page (default: 50, max: 500) + grep: Regex pattern to filter variable names + grep_ignorecase: Case-insensitive grep (default: True) + return_all: Return all variables without pagination + fields: Field names to keep (e.g. ['name', 'type', 'address']). Reduces response size. + ctx: FastMCP context (auto-injected) + + Returns: + Paginated list of variables + """ + try: + port = self.get_instance_port(port) + except ValueError as e: + return {"success": False, "error": {"code": "NO_INSTANCE", "message": str(e)}} + + config = get_config() + cap = config.resource_caps.get("variables", 1000) + params = {"limit": cap} + if global_only: + params["global_only"] = "true" + + response = self.safe_get(port, "variables", params) + simplified = self.simplify_response(response) + + if not simplified.get("success", True): + return simplified + + variables = simplified.get("result", []) + if not isinstance(variables, list): + variables = [] + + query_params = { + "tool": "variables_list", + "port": port, + "global_only": global_only, + "grep": grep, + } + session_id = self._get_session_id(ctx) + + return self.filtered_paginate( + data=variables, + query_params=query_params, + tool_name="variables_list", + session_id=session_id, + page_size=min(page_size, config.max_page_size), + grep=grep, + grep_ignorecase=grep_ignorecase, + return_all=return_all, + fields=fields, + ) + + @mcp_tool() + def functions_variables( + self, + address: str, + port: Optional[int] = None, + page_size: int = 50, + grep: Optional[str] = None, + grep_ignorecase: bool = True, + return_all: bool = False, + fields: Optional[List[str]] = None, + ctx: Optional[Context] = None, + ) -> Dict[str, Any]: + """List local variables and parameters for a specific function. + + Args: + address: Function address in hex format + port: Ghidra instance port (optional) + page_size: Variables per page (default: 50, max: 500) + grep: Regex pattern to filter variable names + grep_ignorecase: Case-insensitive grep (default: True) + return_all: Return all variables without pagination + fields: Field names to keep (e.g. ['name', 'type', 'storage']). Reduces response size. + ctx: FastMCP context (auto-injected) + + Returns: + Paginated list of function variables + """ + if not address: + return { + "success": False, + "error": { + "code": "MISSING_PARAMETER", + "message": "Address parameter is required", + }, + } + + try: + port = self.get_instance_port(port) + except ValueError as e: + return {"success": False, "error": {"code": "NO_INSTANCE", "message": str(e)}} + + config = get_config() + response = self.safe_get(port, f"functions/{address}/variables") + simplified = self.simplify_response(response) + + if not simplified.get("success", True): + return simplified + + variables = simplified.get("result", []) + if not isinstance(variables, list): + variables = [] + + query_params = { + "tool": "functions_variables", + "port": port, + "address": address, + "grep": grep, + } + session_id = self._get_session_id(ctx) + + return self.filtered_paginate( + data=variables, + query_params=query_params, + tool_name="functions_variables", + session_id=session_id, + page_size=min(page_size, config.max_page_size), + grep=grep, + grep_ignorecase=grep_ignorecase, + return_all=return_all, + fields=fields, + ) + + # Resources + + @mcp_resource(uri="ghidra://instance/{port}/variables") + def resource_variables_list(self, port: Optional[int] = None) -> Dict[str, Any]: + """MCP Resource: List variables (capped). + + Args: + port: Ghidra instance port + + Returns: + List of variables (capped) + """ + try: + port = self.get_instance_port(port) + except ValueError as e: + return {"error": str(e)} + + config = get_config() + cap = config.resource_caps.get("variables", 1000) + + response = self.safe_get(port, "variables", {"limit": cap}) + simplified = self.simplify_response(response) + + if not simplified.get("success", True): + return simplified + + variables = simplified.get("result", []) + if not isinstance(variables, list): + variables = [] + + return { + "variables": variables[:cap], + "count": len(variables), + "capped_at": cap if len(variables) >= cap else None, + "_hint": "Use variables_list() tool for full pagination" + if len(variables) >= cap + else None, + } diff --git a/src/ghydramcp/mixins/xrefs.py b/src/ghydramcp/mixins/xrefs.py index 04bbe33..5d009a3 100644 --- a/src/ghydramcp/mixins/xrefs.py +++ b/src/ghydramcp/mixins/xrefs.py @@ -6,10 +6,10 @@ Provides tools for cross-reference (xref) operations. from typing import Any, Dict, List, Optional from fastmcp import Context -from fastmcp.contrib.mcp_mixin import mcp_tool, mcp_resource +from fastmcp.contrib.mcp_mixin import mcp_resource, mcp_tool -from .base import GhydraMixinBase from ..config import get_config +from .base import GhydraMixinBase class XrefsMixin(GhydraMixinBase): diff --git a/src/ghydramcp/server.py b/src/ghydramcp/server.py index b0e31f7..acc3787 100644 --- a/src/ghydramcp/server.py +++ b/src/ghydramcp/server.py @@ -13,17 +13,21 @@ from typing import Optional from fastmcp import FastMCP -from .config import get_config, set_config, GhydraConfig +from .config import GhydraConfig, get_config, set_config from .mixins import ( - InstancesMixin, - FunctionsMixin, - DataMixin, - StructsMixin, AnalysisMixin, - MemoryMixin, - XrefsMixin, CursorsMixin, + DataMixin, DockerMixin, + FunctionsMixin, + InstancesMixin, + MemoryMixin, + NamespacesMixin, + SegmentsMixin, + StructsMixin, + SymbolsMixin, + VariablesMixin, + XrefsMixin, ) @@ -56,6 +60,10 @@ def create_server( xrefs_mixin = XrefsMixin() cursors_mixin = CursorsMixin() docker_mixin = DockerMixin() + symbols_mixin = SymbolsMixin() + segments_mixin = SegmentsMixin() + variables_mixin = VariablesMixin() + namespaces_mixin = NamespacesMixin() # Register all mixins with the server # Each mixin registers its tools, resources, and prompts @@ -68,6 +76,10 @@ def create_server( xrefs_mixin.register_all(mcp) cursors_mixin.register_all(mcp) docker_mixin.register_all(mcp) + symbols_mixin.register_all(mcp) + segments_mixin.register_all(mcp) + variables_mixin.register_all(mcp) + namespaces_mixin.register_all(mcp) # Optional feedback collection cfg = get_config() @@ -90,8 +102,8 @@ def _periodic_discovery(interval: int = 30): Args: interval: Seconds between discovery attempts """ - from .mixins.base import GhydraMixinBase from .core.http_client import safe_get + from .mixins.base import GhydraMixinBase config = get_config() @@ -154,8 +166,8 @@ def main(): # Initial instance discovery print(f" Discovering Ghidra instances on {config.ghidra_host}...", file=sys.stderr) - from .mixins.base import GhydraMixinBase from .core.http_client import safe_get + from .mixins.base import GhydraMixinBase found = 0 for port in config.quick_discovery_range: