From 70f226f68e2127c7f8f443fd3a094de25520e74f Mon Sep 17 00:00:00 2001 From: Ryan Malloy Date: Thu, 29 Jan 2026 16:07:06 -0700 Subject: [PATCH] feat: Add response size guard with field projection and server-side grep return_all=True on large binaries (1800+ functions) produced 72K char responses that exceeded the MCP tool result limit. Instead of truncating, oversized responses now return a structured summary with sample data, available fields, and actionable instructions for narrowing the query. Three layers of filtering: - Server-side grep: Jython HTTP handlers filter during Ghidra iteration - Field projection: jq-style key selection strips unneeded fields - Token budget guard: responses exceeding 8k tokens return a summary New files: core/filtering.py (project_fields, apply_grep, estimate_and_guard) Modified: config, pagination, base mixin, all 5 domain mixins, headless server --- docker/GhydraMCPServer.py | 92 +++++++++++-- src/ghydramcp/config.py | 4 + src/ghydramcp/core/__init__.py | 9 ++ src/ghydramcp/core/filtering.py | 208 ++++++++++++++++++++++++++++++ src/ghydramcp/core/pagination.py | 29 ++++- src/ghydramcp/mixins/analysis.py | 17 ++- src/ghydramcp/mixins/base.py | 36 +++++- src/ghydramcp/mixins/data.py | 12 +- src/ghydramcp/mixins/functions.py | 25 ++-- src/ghydramcp/mixins/structs.py | 16 ++- src/ghydramcp/mixins/xrefs.py | 7 +- 11 files changed, 413 insertions(+), 42 deletions(-) create mode 100644 src/ghydramcp/core/filtering.py diff --git a/docker/GhydraMCPServer.py b/docker/GhydraMCPServer.py index 216289b..d44a800 100644 --- a/docker/GhydraMCPServer.py +++ b/docker/GhydraMCPServer.py @@ -112,6 +112,41 @@ def make_link(href): return {"href": href} +def compile_grep(params): + """Compile a grep pattern from query params if present. + + Returns a compiled regex or None if no grep param. + Uses re.IGNORECASE by default. + """ + grep = params.get("grep") + if not grep: + return None + try: + return re.compile(grep, re.IGNORECASE) + except: + return None + + +def grep_matches_item(item, pattern): + """Check if any string value in item matches the grep pattern. + + Searches all string values in dict items, or the string + representation of non-dict items. + """ + if pattern is None: + return True + if isinstance(item, dict): + for value in item.values(): + if isinstance(value, (str,)): + if pattern.search(value): + return True + elif isinstance(value, (int, float, bool)): + if pattern.search(str(value)): + return True + return False + return bool(pattern.search(str(item))) + + def with_transaction(program, desc, fn): """Execute fn inside a thread-safe Ghidra transaction.""" _tx_lock.acquire() @@ -699,6 +734,7 @@ class GhydraMCPHandler(HttpHandler): limit = parse_int(params.get("limit"), 100) offset = parse_int(params.get("offset"), 0) name_filter = params.get("name") + grep_pattern = compile_grep(params) functions = [] fm = self.program.getFunctionManager() @@ -716,7 +752,7 @@ class GhydraMCPHandler(HttpHandler): skipped += 1 continue addr = str(func.getEntryPoint()) - functions.append({ + item = { "name": func.getName(), "address": addr, "signature": str(func.getSignature()), @@ -727,7 +763,10 @@ class GhydraMCPHandler(HttpHandler): "decompile": make_link("/functions/%s/decompile" % addr), "disassembly": make_link("/functions/%s/disassembly" % addr), }, - }) + } + if not grep_matches_item(item, grep_pattern): + continue + functions.append(item) count += 1 result = { @@ -976,6 +1015,7 @@ class GhydraMCPHandler(HttpHandler): name_filter = params.get("name") name_contains = params.get("name_contains") type_filter = params.get("type") + grep_pattern = compile_grep(params) # Single address lookup if addr_filter: @@ -1036,6 +1076,8 @@ class GhydraMCPHandler(HttpHandler): if sym: item["name"] = sym.getName() item["_links"] = {"self": make_link("/data/%s" % str(data.getAddress()))} + if not grep_matches_item(item, grep_pattern): + continue data_items.append(item) count += 1 @@ -1161,6 +1203,7 @@ class GhydraMCPHandler(HttpHandler): offset = parse_int(params.get("offset"), 0) filter_str = params.get("filter") min_length = parse_int(params.get("min_length"), 2) + grep_pattern = compile_grep(params) strings = [] listing = self.program.getListing() @@ -1206,6 +1249,8 @@ class GhydraMCPHandler(HttpHandler): sym = self.program.getSymbolTable().getPrimarySymbol(data.getAddress()) if sym: item["name"] = sym.getName() + if not grep_matches_item(item, grep_pattern): + continue strings.append(item) count += 1 except: @@ -1392,6 +1437,7 @@ class GhydraMCPHandler(HttpHandler): offset = parse_int(params.get("offset"), 0) name_filter = params.get("name") type_filter = params.get("type") + grep_pattern = compile_grep(params) symbols = [] st = self.program.getSymbolTable() @@ -1408,14 +1454,17 @@ class GhydraMCPHandler(HttpHandler): if skipped < offset: skipped += 1 continue - symbols.append({ + item = { "name": symbol.getName(), "address": str(symbol.getAddress()), "namespace": symbol.getParentNamespace().getName(), "type": str(symbol.getSymbolType()), "isPrimary": symbol.isPrimary(), "isExternal": symbol.isExternal(), - }) + } + if not grep_matches_item(item, grep_pattern): + continue + symbols.append(item) count += 1 return { @@ -1436,6 +1485,7 @@ class GhydraMCPHandler(HttpHandler): params = parse_query_params(exchange) limit = parse_int(params.get("limit"), 100) offset = parse_int(params.get("offset"), 0) + grep_pattern = compile_grep(params) imports = [] count = 0 @@ -1446,11 +1496,14 @@ class GhydraMCPHandler(HttpHandler): if skipped < offset: skipped += 1 continue - imports.append({ + item = { "name": symbol.getName(), "address": str(symbol.getAddress()), "namespace": symbol.getParentNamespace().getName(), - }) + } + if not grep_matches_item(item, grep_pattern): + continue + imports.append(item) count += 1 return {"success": True, "result": imports, "offset": offset, "limit": limit} @@ -1461,6 +1514,7 @@ class GhydraMCPHandler(HttpHandler): params = parse_query_params(exchange) limit = parse_int(params.get("limit"), 100) offset = parse_int(params.get("offset"), 0) + grep_pattern = compile_grep(params) exports = [] count = 0 @@ -1473,10 +1527,13 @@ class GhydraMCPHandler(HttpHandler): if skipped < offset: skipped += 1 continue - exports.append({ + item = { "name": symbol.getName(), "address": str(symbol.getAddress()), - }) + } + if not grep_matches_item(item, grep_pattern): + continue + exports.append(item) count += 1 return {"success": True, "result": exports, "offset": offset, "limit": limit} @@ -1494,6 +1551,7 @@ class GhydraMCPHandler(HttpHandler): type_filter = params.get("type") limit = parse_int(params.get("limit"), 100) offset = parse_int(params.get("offset"), 0) + grep_pattern = compile_grep(params) if not to_addr_str and not from_addr_str: return {"success": False, "error": { @@ -1517,7 +1575,10 @@ class GhydraMCPHandler(HttpHandler): if skipped < offset: skipped += 1 continue - xrefs.append(self._build_xref_info(ref)) + item = self._build_xref_info(ref) + if not grep_matches_item(item, grep_pattern): + continue + xrefs.append(item) count += 1 if from_addr_str: @@ -1533,7 +1594,10 @@ class GhydraMCPHandler(HttpHandler): if skipped < offset: skipped += 1 continue - xrefs.append(self._build_xref_info(ref)) + item = self._build_xref_info(ref) + if not grep_matches_item(item, grep_pattern): + continue + xrefs.append(item) count += 1 return {"success": True, "result": xrefs, "offset": offset, "limit": limit} @@ -1623,6 +1687,7 @@ class GhydraMCPHandler(HttpHandler): limit = parse_int(params.get("limit"), 100) offset = parse_int(params.get("offset"), 0) category_filter = params.get("category") + grep_pattern = compile_grep(params) from ghidra.program.model.data import Structure, Union @@ -1641,7 +1706,7 @@ class GhydraMCPHandler(HttpHandler): if skipped < offset: skipped += 1 continue - structs.append({ + item = { "name": dt.getName(), "category": dt.getCategoryPath().getPath(), "path": dt.getPathName(), @@ -1649,7 +1714,10 @@ class GhydraMCPHandler(HttpHandler): "type": "struct" if isinstance(dt, Structure) else "union", "numFields": dt.getNumComponents(), "_links": {"self": make_link("/structs?name=%s" % dt.getName())}, - }) + } + if not grep_matches_item(item, grep_pattern): + continue + structs.append(item) count += 1 return {"success": True, "result": structs, "offset": offset, "limit": limit} diff --git a/src/ghydramcp/config.py b/src/ghydramcp/config.py index 56dcb13..c5c279a 100644 --- a/src/ghydramcp/config.py +++ b/src/ghydramcp/config.py @@ -72,6 +72,10 @@ class GhydraConfig: cursor_ttl_seconds: int = 300 # 5 minutes max_cursors_per_session: int = 100 + # Response size limits (for return_all guard) + max_response_tokens: int = 8000 # Hard budget — guard triggers above this + large_response_threshold: int = 4000 # Warn above this in normal pagination + # Expected API version expected_api_version: int = 2 diff --git a/src/ghydramcp/core/__init__.py b/src/ghydramcp/core/__init__.py index a19b5f4..74c6cec 100644 --- a/src/ghydramcp/core/__init__.py +++ b/src/ghydramcp/core/__init__.py @@ -24,6 +24,11 @@ from .progress import ( report_progress, report_step, ) +from .filtering import ( + project_fields, + apply_grep, + estimate_and_guard, +) from .logging import ( log_info, log_debug, @@ -50,6 +55,10 @@ __all__ = [ "ProgressReporter", "report_progress", "report_step", + # Filtering + "project_fields", + "apply_grep", + "estimate_and_guard", # Logging "log_info", "log_debug", diff --git a/src/ghydramcp/core/filtering.py b/src/ghydramcp/core/filtering.py new file mode 100644 index 0000000..f735881 --- /dev/null +++ b/src/ghydramcp/core/filtering.py @@ -0,0 +1,208 @@ +"""Field projection and response size guard for GhydraMCP. + +Provides jq-style field projection, grep filtering, and token budget +enforcement to prevent oversized MCP tool results. +""" + +import json +import re +import time +from typing import Any, Dict, Optional + +from ..config import get_config + + +# Token estimation (same ratio as pagination.py) +TOKEN_ESTIMATION_RATIO = 4.0 + + +def project_fields(items: list, fields: list[str]) -> list: + """Select only specified keys from each item (jq-style projection). + + Works on dicts and strings. For dicts, returns only the requested + keys. For non-dict items (e.g. lines of decompiled code), returns + them unchanged. + + Args: + items: List of items to project + fields: List of field names to keep + + Returns: + List of projected items + """ + if not fields or not items: + return items + + field_set = set(fields) + projected = [] + for item in items: + if isinstance(item, dict): + projected.append({k: v for k, v in item.items() if k in field_set}) + else: + projected.append(item) + return projected + + +def apply_grep(items: list, pattern: str, ignorecase: bool = True) -> list: + """Filter items by regex pattern across all string values. + + Searches all string-coercible values in each item. For dicts, + searches all values recursively. For strings, searches directly. + + Args: + items: List of items to filter + pattern: Regex pattern string + ignorecase: Case-insensitive matching (default True) + + Returns: + Filtered list of matching items + """ + if not pattern or not items: + return items + + flags = re.IGNORECASE if ignorecase else 0 + compiled = re.compile(pattern, flags) + + return [item for item in items if _matches(item, compiled)] + + +def _matches(item: Any, pattern: re.Pattern, depth: int = 0) -> bool: + """Check if item matches pattern (recursive for nested structures).""" + if depth > 10: + return False + + if isinstance(item, dict): + for value in item.values(): + if isinstance(value, str) and pattern.search(value): + return True + elif isinstance(value, (int, float)): + if pattern.search(str(value)): + return True + elif isinstance(value, (dict, list, tuple)): + if _matches(value, pattern, depth + 1): + return True + return False + elif isinstance(item, (list, tuple)): + return any(_matches(i, pattern, depth + 1) for i in item) + elif isinstance(item, str): + return bool(pattern.search(item)) + else: + return bool(pattern.search(str(item))) + + +def _estimate_tokens(data: Any) -> int: + """Estimate token count from serialized JSON size.""" + text = json.dumps(data, default=str) + return int(len(text) / TOKEN_ESTIMATION_RATIO) + + +def _extract_available_fields(items: list) -> list[str]: + """Extract the set of field names from the first few dict items.""" + fields = set() + for item in items[:5]: + if isinstance(item, dict): + fields.update(item.keys()) + # Remove internal/HATEOAS fields + fields.discard("_links") + return sorted(fields) + + +def estimate_and_guard( + data: list, + tool_name: str, + budget: Optional[int] = None, + query_hints: Optional[Dict[str, Any]] = None, +) -> Dict[str, Any]: + """Check if data exceeds token budget; return guard response if so. + + If data fits within budget, returns None (caller should proceed + normally). If data exceeds budget, returns a structured summary + with instructions for narrowing the query. + + Args: + data: The full data list to check + tool_name: Name of the tool (for hint messages) + budget: Token budget override (defaults to config.max_response_tokens) + query_hints: Original query params (for building hint commands) + + Returns: + None if data fits within budget, or a guard response dict + """ + config = get_config() + if budget is None: + budget = config.max_response_tokens + + estimated = _estimate_tokens(data) + if estimated <= budget: + return None + + # Build sample from first 3 items + sample = data[:3] + available_fields = _extract_available_fields(data) + + # Build actionable hints based on the tool name + hints = _build_hints(tool_name, available_fields, query_hints) + + return { + "success": True, + "guarded": True, + "total_count": len(data), + "estimated_tokens": estimated, + "budget": budget, + "sample": sample, + "available_fields": available_fields, + "message": ( + "Response too large (%d items, ~%s tokens, budget: %s). " + "To read this data:\n%s" + ) % ( + len(data), + _format_tokens(estimated), + _format_tokens(budget), + hints, + ), + "timestamp": int(time.time() * 1000), + } + + +def _format_tokens(n: int) -> str: + """Format token count for display (e.g. 45000 -> '45k').""" + if n >= 1000: + return "%dk" % (n // 1000) + return str(n) + + +def _build_hints( + tool_name: str, + available_fields: list[str], + query_hints: Optional[Dict[str, Any]] = None, +) -> str: + """Build actionable hint text for the guard message.""" + lines = [] + + # Pagination hint + lines.append( + " - Paginate: %s(page_size=50) then cursor_next(cursor_id='...')" + % tool_name + ) + + # Grep hint + grep_example = "main" if "functions" in tool_name else ".*pattern.*" + lines.append( + " - Filter: %s(grep='%s')" % (tool_name, grep_example) + ) + + # Fields hint (only if we have dict items with fields) + if available_fields: + short_fields = available_fields[:2] + lines.append( + " - Project: %s(fields=%s)" % (tool_name, short_fields) + ) + + # Combined hint + if available_fields: + lines.append( + " - Combine: %s(grep='...', fields=%s, return_all=True)" + % (tool_name, available_fields[:2]) + ) + + return "\n".join(lines) diff --git a/src/ghydramcp/core/pagination.py b/src/ghydramcp/core/pagination.py index e888667..2eea804 100644 --- a/src/ghydramcp/core/pagination.py +++ b/src/ghydramcp/core/pagination.py @@ -14,6 +14,7 @@ from threading import Lock from typing import Any, Dict, List, Optional, Tuple from ..config import get_config +from .filtering import project_fields, estimate_and_guard # ReDoS Protection Configuration @@ -393,8 +394,9 @@ def paginate_response( grep: Optional[str] = None, grep_ignorecase: bool = True, return_all: bool = False, + fields: Optional[List[str]] = None, ) -> Dict[str, Any]: - """Create a paginated response with optional grep filtering. + """Create a paginated response with optional grep filtering and field projection. Args: data: Full result list to paginate @@ -404,7 +406,8 @@ def paginate_response( page_size: Items per page (default: 50, max: 500) grep: Optional regex pattern to filter results grep_ignorecase: Case-insensitive grep (default: True) - return_all: Bypass pagination and return all results (with warning) + return_all: Bypass pagination and return all results (with budget guard) + fields: Optional list of field names to project (jq-style) Returns: dict with pagination metadata and results @@ -431,6 +434,19 @@ def paginate_response( "timestamp": int(time.time() * 1000), } + # Apply field projection before size estimation + if fields: + filtered_data = project_fields(filtered_data, fields) + + # Check token budget — return guard if exceeded + guard = estimate_and_guard( + data=filtered_data, + tool_name=tool_name, + query_hints=query_params, + ) + if guard is not None: + return guard + estimated_tokens = estimate_tokens(filtered_data) warning = None @@ -438,7 +454,7 @@ def paginate_response( warning = f"EXTREMELY LARGE response (~{estimated_tokens:,} tokens)" elif estimated_tokens > 20000: warning = f"VERY LARGE response (~{estimated_tokens:,} tokens)" - elif estimated_tokens > 8000: + elif estimated_tokens > config.large_response_threshold: warning = f"Large response (~{estimated_tokens:,} tokens)" return { @@ -449,16 +465,19 @@ def paginate_response( "total_count": len(data), "filtered_count": len(filtered_data), "grep_pattern": grep, + "fields_projected": fields, "estimated_tokens": estimated_tokens, "warning": warning, }, "timestamp": int(time.time() * 1000), } - # Normal pagination flow + # Normal pagination flow — apply field projection before cursoring + paginated_data = project_fields(data, fields) if fields else data + try: cursor_id, state = cursor_manager.create_cursor( - data=data, + data=paginated_data, query_params=query_params, tool_name=tool_name, session_id=session_id, diff --git a/src/ghydramcp/mixins/analysis.py b/src/ghydramcp/mixins/analysis.py index 86126b4..1af360a 100644 --- a/src/ghydramcp/mixins/analysis.py +++ b/src/ghydramcp/mixins/analysis.py @@ -3,7 +3,7 @@ Provides tools for program analysis operations. """ -from typing import Any, Dict, Optional +from typing import Any, Dict, List, Optional from fastmcp import Context from fastmcp.contrib.mcp_mixin import mcp_tool @@ -57,6 +57,7 @@ class AnalysisMixin(GhydraMixinBase): grep: Optional[str] = None, grep_ignorecase: bool = True, return_all: bool = False, + fields: Optional[List[str]] = None, ctx: Optional[Context] = None, ) -> Dict[str, Any]: """Get function call graph with edge pagination. @@ -70,6 +71,7 @@ class AnalysisMixin(GhydraMixinBase): grep: Regex pattern to filter edges grep_ignorecase: Case-insensitive grep (default: True) return_all: Return all edges without pagination + fields: Field names to keep per edge (e.g. ['from', 'to']). Reduces response size. ctx: FastMCP context (auto-injected) Returns: @@ -115,7 +117,7 @@ class AnalysisMixin(GhydraMixinBase): } session_id = self._get_session_id(ctx) - paginated = self.paginate_response( + paginated = self.filtered_paginate( data=edges, query_params=query_params, tool_name="analysis_get_callgraph", @@ -124,9 +126,10 @@ class AnalysisMixin(GhydraMixinBase): grep=grep, grep_ignorecase=grep_ignorecase, return_all=return_all, + fields=fields, ) - if paginated.get("success"): + if paginated.get("success") and not paginated.get("guarded"): paginated["result"] = { "root_function": func_id, "max_depth": max_depth, @@ -148,6 +151,7 @@ class AnalysisMixin(GhydraMixinBase): grep: Optional[str] = None, grep_ignorecase: bool = True, return_all: bool = False, + fields: Optional[List[str]] = None, ctx: Optional[Context] = None, ) -> Dict[str, Any]: """Perform data flow analysis with step pagination. @@ -161,6 +165,7 @@ class AnalysisMixin(GhydraMixinBase): grep: Regex pattern to filter steps grep_ignorecase: Case-insensitive grep (default: True) return_all: Return all steps without pagination + fields: Field names to keep per step. Reduces response size. ctx: FastMCP context (auto-injected) Returns: @@ -210,7 +215,7 @@ class AnalysisMixin(GhydraMixinBase): } session_id = self._get_session_id(ctx) - paginated = self.paginate_response( + paginated = self.filtered_paginate( data=steps, query_params=query_params, tool_name="analysis_get_dataflow", @@ -219,9 +224,11 @@ class AnalysisMixin(GhydraMixinBase): grep=grep, grep_ignorecase=grep_ignorecase, return_all=return_all, + fields=fields, ) - if paginated.get("success"): + # Merge metadata into result (skip if guarded) + if paginated.get("success") and not paginated.get("guarded"): paginated["result"] = { "start_address": address, "direction": direction, diff --git a/src/ghydramcp/mixins/base.py b/src/ghydramcp/mixins/base.py index 367c8e6..e849510 100644 --- a/src/ghydramcp/mixins/base.py +++ b/src/ghydramcp/mixins/base.py @@ -12,7 +12,7 @@ from fastmcp.contrib.mcp_mixin import MCPMixin from ..config import get_config from ..core.http_client import safe_get, safe_post, safe_put, safe_patch, safe_delete, simplify_response -from ..core.pagination import get_cursor_manager, paginate_response +from ..core.pagination import paginate_response from ..core.logging import log_info, log_debug, log_warning, log_error @@ -209,8 +209,9 @@ class GhydraMixinBase(MCPMixin): grep: Optional[str] = None, grep_ignorecase: bool = True, return_all: bool = False, + fields: Optional[list] = None, ) -> Dict: - """Create paginated response.""" + """Create paginated response with optional field projection.""" return paginate_response( data=data, query_params=query_params, @@ -220,6 +221,37 @@ class GhydraMixinBase(MCPMixin): grep=grep, grep_ignorecase=grep_ignorecase, return_all=return_all, + fields=fields, + ) + + def filtered_paginate( + self, + data: list, + query_params: Dict, + tool_name: str, + session_id: str = "default", + page_size: int = 50, + grep: Optional[str] = None, + grep_ignorecase: bool = True, + return_all: bool = False, + fields: Optional[list] = None, + ) -> Dict: + """Paginate with field projection and budget guard. + + Convenience wrapper that applies field projection then delegates + to paginate_response. Prefer this over paginate_response for any + tool that could return large result sets. + """ + return self.paginate_response( + data=data, + query_params=query_params, + tool_name=tool_name, + session_id=session_id, + page_size=page_size, + grep=grep, + grep_ignorecase=grep_ignorecase, + return_all=return_all, + fields=fields, ) # Async logging helpers diff --git a/src/ghydramcp/mixins/data.py b/src/ghydramcp/mixins/data.py index a63a73e..dde51ec 100644 --- a/src/ghydramcp/mixins/data.py +++ b/src/ghydramcp/mixins/data.py @@ -3,7 +3,7 @@ Provides tools for data items and strings operations. """ -from typing import Any, Dict, Optional +from typing import Any, Dict, List, Optional from fastmcp import Context from fastmcp.contrib.mcp_mixin import mcp_tool, mcp_resource @@ -34,6 +34,7 @@ class DataMixin(GhydraMixinBase): grep: Optional[str] = None, grep_ignorecase: bool = True, return_all: bool = False, + fields: Optional[List[str]] = None, ctx: Optional[Context] = None, ) -> Dict[str, Any]: """List defined data items with filtering and cursor-based pagination. @@ -48,6 +49,7 @@ class DataMixin(GhydraMixinBase): grep: Regex pattern to filter results grep_ignorecase: Case-insensitive grep (default: True) return_all: Return all results without pagination + fields: Field names to keep (e.g. ['address', 'name']). Reduces response size. ctx: FastMCP context (auto-injected) Returns: @@ -91,7 +93,7 @@ class DataMixin(GhydraMixinBase): } session_id = self._get_session_id(ctx) - return self.paginate_response( + return self.filtered_paginate( data=all_data, query_params=query_params, tool_name="data_list", @@ -100,6 +102,7 @@ class DataMixin(GhydraMixinBase): grep=grep, grep_ignorecase=grep_ignorecase, return_all=return_all, + fields=fields, ) @mcp_tool() @@ -111,6 +114,7 @@ class DataMixin(GhydraMixinBase): grep: Optional[str] = None, grep_ignorecase: bool = True, return_all: bool = False, + fields: Optional[List[str]] = None, ctx: Optional[Context] = None, ) -> Dict[str, Any]: """List all defined strings in the binary with pagination. @@ -122,6 +126,7 @@ class DataMixin(GhydraMixinBase): grep: Regex pattern to filter results (e.g., "password|key") grep_ignorecase: Case-insensitive grep (default: True) return_all: Return all strings without pagination + fields: Field names to keep (e.g. ['value', 'address']). Reduces response size. ctx: FastMCP context (auto-injected) Returns: @@ -157,7 +162,7 @@ class DataMixin(GhydraMixinBase): } session_id = self._get_session_id(ctx) - return self.paginate_response( + return self.filtered_paginate( data=result_data, query_params=query_params, tool_name="data_list_strings", @@ -166,6 +171,7 @@ class DataMixin(GhydraMixinBase): grep=grep, grep_ignorecase=grep_ignorecase, return_all=return_all, + fields=fields, ) @mcp_tool() diff --git a/src/ghydramcp/mixins/functions.py b/src/ghydramcp/mixins/functions.py index 73298d0..3eb129b 100644 --- a/src/ghydramcp/mixins/functions.py +++ b/src/ghydramcp/mixins/functions.py @@ -3,7 +3,7 @@ Provides tools for function analysis, decompilation, and manipulation. """ -from typing import Any, Dict, Optional +from typing import Any, Dict, List, Optional from urllib.parse import quote from fastmcp import Context @@ -33,6 +33,7 @@ class FunctionsMixin(GhydraMixinBase): grep: Optional[str] = None, grep_ignorecase: bool = True, return_all: bool = False, + fields: Optional[List[str]] = None, ctx: Optional[Context] = None, ) -> Dict[str, Any]: """List functions with cursor-based pagination. @@ -43,6 +44,7 @@ class FunctionsMixin(GhydraMixinBase): grep: Regex pattern to filter function names grep_ignorecase: Case-insensitive grep (default: True) return_all: Return all functions without pagination + fields: Field names to keep (e.g. ['name', 'address']). Reduces response size. ctx: FastMCP context (auto-injected) Returns: @@ -67,7 +69,7 @@ class FunctionsMixin(GhydraMixinBase): query_params = {"tool": "functions_list", "port": port, "grep": grep} session_id = self._get_session_id(ctx) - return self.paginate_response( + return self.filtered_paginate( data=functions, query_params=query_params, tool_name="functions_list", @@ -76,6 +78,7 @@ class FunctionsMixin(GhydraMixinBase): grep=grep, grep_ignorecase=grep_ignorecase, return_all=return_all, + fields=fields, ) @mcp_tool() @@ -129,6 +132,7 @@ class FunctionsMixin(GhydraMixinBase): grep: Optional[str] = None, grep_ignorecase: bool = True, return_all: bool = False, + fields: Optional[List[str]] = None, ctx: Optional[Context] = None, ) -> Dict[str, Any]: """Get decompiled code for a function with line pagination. @@ -143,6 +147,7 @@ class FunctionsMixin(GhydraMixinBase): grep: Regex pattern to filter lines grep_ignorecase: Case-insensitive grep (default: True) return_all: Return all lines without pagination + fields: Field names to keep (for structured results) ctx: FastMCP context (auto-injected) Returns: @@ -194,7 +199,7 @@ class FunctionsMixin(GhydraMixinBase): } session_id = self._get_session_id(ctx) - paginated = self.paginate_response( + paginated = self.filtered_paginate( data=lines, query_params=query_params, tool_name="functions_decompile", @@ -203,10 +208,11 @@ class FunctionsMixin(GhydraMixinBase): grep=grep, grep_ignorecase=grep_ignorecase, return_all=return_all, + fields=fields, ) - # Convert lines back to text in result - if paginated.get("success"): + # Convert lines back to text in result (skip if guarded) + if paginated.get("success") and not paginated.get("guarded"): paginated["result"] = "\n".join(paginated.get("result", [])) paginated["function_name"] = result.get("name", name or address) @@ -222,6 +228,7 @@ class FunctionsMixin(GhydraMixinBase): grep: Optional[str] = None, grep_ignorecase: bool = True, return_all: bool = False, + fields: Optional[List[str]] = None, ctx: Optional[Context] = None, ) -> Dict[str, Any]: """Get disassembly for a function with instruction pagination. @@ -234,6 +241,7 @@ class FunctionsMixin(GhydraMixinBase): grep: Regex pattern to filter instructions grep_ignorecase: Case-insensitive grep (default: True) return_all: Return all instructions without pagination + fields: Field names to keep (for structured results) ctx: FastMCP context (auto-injected) Returns: @@ -284,7 +292,7 @@ class FunctionsMixin(GhydraMixinBase): } session_id = self._get_session_id(ctx) - paginated = self.paginate_response( + paginated = self.filtered_paginate( data=lines, query_params=query_params, tool_name="functions_disassemble", @@ -293,10 +301,11 @@ class FunctionsMixin(GhydraMixinBase): grep=grep, grep_ignorecase=grep_ignorecase, return_all=return_all, + fields=fields, ) - # Convert lines back to text - if paginated.get("success"): + # Convert lines back to text (skip if guarded) + if paginated.get("success") and not paginated.get("guarded"): paginated["result"] = "\n".join(paginated.get("result", [])) paginated["function_name"] = result.get("name", name or address) diff --git a/src/ghydramcp/mixins/structs.py b/src/ghydramcp/mixins/structs.py index 28fda14..be427e1 100644 --- a/src/ghydramcp/mixins/structs.py +++ b/src/ghydramcp/mixins/structs.py @@ -3,7 +3,7 @@ Provides tools for struct data type operations. """ -from typing import Any, Dict, Optional +from typing import Any, Dict, List, Optional from fastmcp import Context from fastmcp.contrib.mcp_mixin import mcp_tool, mcp_resource @@ -31,6 +31,7 @@ class StructsMixin(GhydraMixinBase): grep: Optional[str] = None, grep_ignorecase: bool = True, return_all: bool = False, + fields: Optional[List[str]] = None, ctx: Optional[Context] = None, ) -> Dict[str, Any]: """List all struct data types with cursor-based pagination. @@ -42,6 +43,7 @@ class StructsMixin(GhydraMixinBase): grep: Regex pattern to filter struct names grep_ignorecase: Case-insensitive grep (default: True) return_all: Return all results without pagination + fields: Field names to keep (e.g. ['name', 'size']). Reduces response size. ctx: FastMCP context (auto-injected) Returns: @@ -76,7 +78,7 @@ class StructsMixin(GhydraMixinBase): } session_id = self._get_session_id(ctx) - return self.paginate_response( + return self.filtered_paginate( data=all_structs, query_params=query_params, tool_name="structs_list", @@ -85,6 +87,7 @@ class StructsMixin(GhydraMixinBase): grep=grep, grep_ignorecase=grep_ignorecase, return_all=return_all, + fields=fields, ) @mcp_tool() @@ -96,6 +99,7 @@ class StructsMixin(GhydraMixinBase): grep: Optional[str] = None, grep_ignorecase: bool = True, return_all: bool = False, + project_fields: Optional[List[str]] = None, ctx: Optional[Context] = None, ) -> Dict[str, Any]: """Get detailed information about a struct with field pagination. @@ -107,6 +111,7 @@ class StructsMixin(GhydraMixinBase): grep: Regex pattern to filter fields grep_ignorecase: Case-insensitive grep (default: True) return_all: Return all fields without pagination + project_fields: Field names to keep per struct field item. Reduces response size. ctx: FastMCP context (auto-injected) Returns: @@ -160,7 +165,7 @@ class StructsMixin(GhydraMixinBase): } # Paginate fields - paginated = self.paginate_response( + paginated = self.filtered_paginate( data=fields, query_params=query_params, tool_name="structs_get", @@ -169,10 +174,11 @@ class StructsMixin(GhydraMixinBase): grep=grep, grep_ignorecase=grep_ignorecase, return_all=return_all, + fields=project_fields, ) - # Merge struct metadata with paginated fields - if paginated.get("success"): + # Merge struct metadata with paginated fields (skip if guarded) + if paginated.get("success") and not paginated.get("guarded"): paginated["struct_name"] = struct_info.get("name", name) paginated["struct_size"] = struct_info.get("size", struct_info.get("length")) paginated["struct_category"] = struct_info.get("category", struct_info.get("categoryPath")) diff --git a/src/ghydramcp/mixins/xrefs.py b/src/ghydramcp/mixins/xrefs.py index 1d17c21..04bbe33 100644 --- a/src/ghydramcp/mixins/xrefs.py +++ b/src/ghydramcp/mixins/xrefs.py @@ -3,7 +3,7 @@ Provides tools for cross-reference (xref) operations. """ -from typing import Any, Dict, Optional +from typing import Any, Dict, List, Optional from fastmcp import Context from fastmcp.contrib.mcp_mixin import mcp_tool, mcp_resource @@ -32,6 +32,7 @@ class XrefsMixin(GhydraMixinBase): grep: Optional[str] = None, grep_ignorecase: bool = True, return_all: bool = False, + fields: Optional[List[str]] = None, ctx: Optional[Context] = None, ) -> Dict[str, Any]: """List cross-references with filtering and pagination. @@ -45,6 +46,7 @@ class XrefsMixin(GhydraMixinBase): grep: Regex pattern to filter results grep_ignorecase: Case-insensitive grep (default: True) return_all: Return all results without pagination + fields: Field names to keep (e.g. ['fromAddress', 'toAddress']). Reduces response size. ctx: FastMCP context (auto-injected) Returns: @@ -94,7 +96,7 @@ class XrefsMixin(GhydraMixinBase): } session_id = self._get_session_id(ctx) - return self.paginate_response( + return self.filtered_paginate( data=all_xrefs, query_params=query_params, tool_name="xrefs_list", @@ -103,6 +105,7 @@ class XrefsMixin(GhydraMixinBase): grep=grep, grep_ignorecase=grep_ignorecase, return_all=return_all, + fields=fields, ) # Resources