return_all=True on large binaries (1800+ functions) produced 72K char responses that exceeded the MCP tool result limit. Instead of truncating, oversized responses now return a structured summary with sample data, available fields, and actionable instructions for narrowing the query. Three layers of filtering: - Server-side grep: Jython HTTP handlers filter during Ghidra iteration - Field projection: jq-style key selection strips unneeded fields - Token budget guard: responses exceeding 8k tokens return a summary New files: core/filtering.py (project_fields, apply_grep, estimate_and_guard) Modified: config, pagination, base mixin, all 5 domain mixins, headless server
391 lines
12 KiB
Python
391 lines
12 KiB
Python
"""Data mixin for GhydraMCP.
|
|
|
|
Provides tools for data items and strings operations.
|
|
"""
|
|
|
|
from typing import Any, Dict, List, Optional
|
|
|
|
from fastmcp import Context
|
|
from fastmcp.contrib.mcp_mixin import mcp_tool, mcp_resource
|
|
|
|
from .base import GhydraMixinBase
|
|
from ..config import get_config
|
|
|
|
|
|
class DataMixin(GhydraMixinBase):
|
|
"""Mixin for data operations.
|
|
|
|
Provides tools for:
|
|
- Listing and searching data items
|
|
- Creating and modifying data
|
|
- Working with strings
|
|
- Setting data types
|
|
"""
|
|
|
|
@mcp_tool()
|
|
def data_list(
|
|
self,
|
|
addr: Optional[str] = None,
|
|
name: Optional[str] = None,
|
|
name_contains: Optional[str] = None,
|
|
type: Optional[str] = None,
|
|
port: Optional[int] = None,
|
|
page_size: int = 50,
|
|
grep: Optional[str] = None,
|
|
grep_ignorecase: bool = True,
|
|
return_all: bool = False,
|
|
fields: Optional[List[str]] = None,
|
|
ctx: Optional[Context] = None,
|
|
) -> Dict[str, Any]:
|
|
"""List defined data items with filtering and cursor-based pagination.
|
|
|
|
Args:
|
|
addr: Filter by address (hexadecimal)
|
|
name: Exact name match filter (case-sensitive)
|
|
name_contains: Substring name filter (case-insensitive)
|
|
type: Filter by data type (e.g. "string", "dword")
|
|
port: Ghidra instance port (optional)
|
|
page_size: Items per page (default: 50, max: 500)
|
|
grep: Regex pattern to filter results
|
|
grep_ignorecase: Case-insensitive grep (default: True)
|
|
return_all: Return all results without pagination
|
|
fields: Field names to keep (e.g. ['address', 'name']). Reduces response size.
|
|
ctx: FastMCP context (auto-injected)
|
|
|
|
Returns:
|
|
Data items with pagination metadata
|
|
"""
|
|
try:
|
|
port = self.get_instance_port(port)
|
|
except ValueError as e:
|
|
return {"success": False, "error": {"code": "NO_INSTANCE", "message": str(e)}}
|
|
|
|
config = get_config()
|
|
|
|
params = {"offset": 0, "limit": 10000}
|
|
if addr:
|
|
params["addr"] = addr
|
|
if name:
|
|
params["name"] = name
|
|
if name_contains:
|
|
params["name_contains"] = name_contains
|
|
if type:
|
|
params["type"] = type
|
|
|
|
response = self.safe_get(port, "data", params)
|
|
simplified = self.simplify_response(response)
|
|
|
|
if not simplified.get("success", True):
|
|
return simplified
|
|
|
|
all_data = simplified.get("result", [])
|
|
if not isinstance(all_data, list):
|
|
all_data = []
|
|
|
|
query_params = {
|
|
"tool": "data_list",
|
|
"port": port,
|
|
"addr": addr,
|
|
"name": name,
|
|
"name_contains": name_contains,
|
|
"type": type,
|
|
"grep": grep,
|
|
}
|
|
session_id = self._get_session_id(ctx)
|
|
|
|
return self.filtered_paginate(
|
|
data=all_data,
|
|
query_params=query_params,
|
|
tool_name="data_list",
|
|
session_id=session_id,
|
|
page_size=min(page_size, config.max_page_size),
|
|
grep=grep,
|
|
grep_ignorecase=grep_ignorecase,
|
|
return_all=return_all,
|
|
fields=fields,
|
|
)
|
|
|
|
@mcp_tool()
|
|
def data_list_strings(
|
|
self,
|
|
filter: Optional[str] = None,
|
|
port: Optional[int] = None,
|
|
page_size: int = 50,
|
|
grep: Optional[str] = None,
|
|
grep_ignorecase: bool = True,
|
|
return_all: bool = False,
|
|
fields: Optional[List[str]] = None,
|
|
ctx: Optional[Context] = None,
|
|
) -> Dict[str, Any]:
|
|
"""List all defined strings in the binary with pagination.
|
|
|
|
Args:
|
|
filter: Server-side string content filter
|
|
port: Ghidra instance port (optional)
|
|
page_size: Items per page (default: 50, max: 500)
|
|
grep: Regex pattern to filter results (e.g., "password|key")
|
|
grep_ignorecase: Case-insensitive grep (default: True)
|
|
return_all: Return all strings without pagination
|
|
fields: Field names to keep (e.g. ['value', 'address']). Reduces response size.
|
|
ctx: FastMCP context (auto-injected)
|
|
|
|
Returns:
|
|
List of strings with pagination info
|
|
"""
|
|
try:
|
|
port = self.get_instance_port(port)
|
|
except ValueError as e:
|
|
return {"success": False, "error": {"code": "NO_INSTANCE", "message": str(e)}}
|
|
|
|
config = get_config()
|
|
fetch_limit = 10000 if return_all else max(page_size * 10, 2000)
|
|
|
|
params = {"offset": 0, "limit": fetch_limit}
|
|
if filter:
|
|
params["filter"] = filter
|
|
|
|
response = self.safe_get(port, "strings", params)
|
|
simplified = self.simplify_response(response)
|
|
|
|
if not simplified.get("success", True):
|
|
return simplified
|
|
|
|
result_data = simplified.get("result", [])
|
|
if not isinstance(result_data, list):
|
|
return simplified
|
|
|
|
query_params = {
|
|
"tool": "data_list_strings",
|
|
"port": port,
|
|
"filter": filter,
|
|
"grep": grep,
|
|
}
|
|
session_id = self._get_session_id(ctx)
|
|
|
|
return self.filtered_paginate(
|
|
data=result_data,
|
|
query_params=query_params,
|
|
tool_name="data_list_strings",
|
|
session_id=session_id,
|
|
page_size=min(page_size, config.max_page_size),
|
|
grep=grep,
|
|
grep_ignorecase=grep_ignorecase,
|
|
return_all=return_all,
|
|
fields=fields,
|
|
)
|
|
|
|
@mcp_tool()
|
|
def data_create(
|
|
self,
|
|
address: str,
|
|
data_type: str,
|
|
size: Optional[int] = None,
|
|
port: Optional[int] = None,
|
|
) -> Dict[str, Any]:
|
|
"""Define a new data item at the specified address.
|
|
|
|
Args:
|
|
address: Memory address in hex format
|
|
data_type: Data type (e.g. "string", "dword", "byte")
|
|
size: Optional size in bytes
|
|
port: Ghidra instance port (optional)
|
|
|
|
Returns:
|
|
Operation result with created data information
|
|
"""
|
|
if not address or not data_type:
|
|
return {
|
|
"success": False,
|
|
"error": {
|
|
"code": "MISSING_PARAMETER",
|
|
"message": "Address and data_type parameters are required",
|
|
},
|
|
}
|
|
|
|
try:
|
|
port = self.get_instance_port(port)
|
|
except ValueError as e:
|
|
return {"success": False, "error": {"code": "NO_INSTANCE", "message": str(e)}}
|
|
|
|
payload = {"address": address, "type": data_type}
|
|
if size is not None:
|
|
payload["size"] = size
|
|
|
|
response = self.safe_post(port, "data", payload)
|
|
return self.simplify_response(response)
|
|
|
|
@mcp_tool()
|
|
def data_rename(
|
|
self,
|
|
address: str,
|
|
name: str,
|
|
port: Optional[int] = None,
|
|
) -> Dict[str, Any]:
|
|
"""Rename a data item.
|
|
|
|
Args:
|
|
address: Memory address in hex format
|
|
name: New name for the data item
|
|
port: Ghidra instance port (optional)
|
|
|
|
Returns:
|
|
Operation result
|
|
"""
|
|
if not address or not name:
|
|
return {
|
|
"success": False,
|
|
"error": {
|
|
"code": "MISSING_PARAMETER",
|
|
"message": "Address and name parameters are required",
|
|
},
|
|
}
|
|
|
|
try:
|
|
port = self.get_instance_port(port)
|
|
except ValueError as e:
|
|
return {"success": False, "error": {"code": "NO_INSTANCE", "message": str(e)}}
|
|
|
|
payload = {"address": address, "newName": name}
|
|
response = self.safe_post(port, "data", payload)
|
|
return self.simplify_response(response)
|
|
|
|
@mcp_tool()
|
|
def data_delete(
|
|
self,
|
|
address: str,
|
|
port: Optional[int] = None,
|
|
) -> Dict[str, Any]:
|
|
"""Delete data at the specified address.
|
|
|
|
Args:
|
|
address: Memory address in hex format
|
|
port: Ghidra instance port (optional)
|
|
|
|
Returns:
|
|
Operation result
|
|
"""
|
|
if not address:
|
|
return {
|
|
"success": False,
|
|
"error": {
|
|
"code": "MISSING_PARAMETER",
|
|
"message": "Address parameter is required",
|
|
},
|
|
}
|
|
|
|
try:
|
|
port = self.get_instance_port(port)
|
|
except ValueError as e:
|
|
return {"success": False, "error": {"code": "NO_INSTANCE", "message": str(e)}}
|
|
|
|
payload = {"address": address, "action": "delete"}
|
|
response = self.safe_post(port, "data/delete", payload)
|
|
return self.simplify_response(response)
|
|
|
|
@mcp_tool()
|
|
def data_set_type(
|
|
self,
|
|
address: str,
|
|
data_type: str,
|
|
port: Optional[int] = None,
|
|
) -> Dict[str, Any]:
|
|
"""Set the data type of a data item.
|
|
|
|
Args:
|
|
address: Memory address in hex format
|
|
data_type: Data type name (e.g. "uint32_t", "char[10]")
|
|
port: Ghidra instance port (optional)
|
|
|
|
Returns:
|
|
Operation result
|
|
"""
|
|
if not address or not data_type:
|
|
return {
|
|
"success": False,
|
|
"error": {
|
|
"code": "MISSING_PARAMETER",
|
|
"message": "Address and data_type parameters are required",
|
|
},
|
|
}
|
|
|
|
try:
|
|
port = self.get_instance_port(port)
|
|
except ValueError as e:
|
|
return {"success": False, "error": {"code": "NO_INSTANCE", "message": str(e)}}
|
|
|
|
payload = {"address": address, "type": data_type}
|
|
response = self.safe_post(port, "data/type", payload)
|
|
return self.simplify_response(response)
|
|
|
|
# Resources
|
|
|
|
@mcp_resource(uri="ghidra://instance/{port}/strings")
|
|
def resource_strings_list(self, port: Optional[int] = None) -> Dict[str, Any]:
|
|
"""MCP Resource: List strings (capped).
|
|
|
|
Args:
|
|
port: Ghidra instance port
|
|
|
|
Returns:
|
|
List of strings (capped at 1000)
|
|
"""
|
|
try:
|
|
port = self.get_instance_port(port)
|
|
except ValueError as e:
|
|
return {"error": str(e)}
|
|
|
|
config = get_config()
|
|
cap = config.resource_caps.get("strings", 1000)
|
|
|
|
response = self.safe_get(port, "strings", {"limit": cap})
|
|
simplified = self.simplify_response(response)
|
|
|
|
if not simplified.get("success", True):
|
|
return simplified
|
|
|
|
strings = simplified.get("result", [])
|
|
if not isinstance(strings, list):
|
|
strings = []
|
|
|
|
return {
|
|
"strings": strings[:cap],
|
|
"count": len(strings),
|
|
"capped_at": cap if len(strings) >= cap else None,
|
|
"_hint": "Use data_list_strings() tool for full pagination" if len(strings) >= cap else None,
|
|
}
|
|
|
|
@mcp_resource(uri="ghidra://instance/{port}/data")
|
|
def resource_data_list(self, port: Optional[int] = None) -> Dict[str, Any]:
|
|
"""MCP Resource: List data items (capped).
|
|
|
|
Args:
|
|
port: Ghidra instance port
|
|
|
|
Returns:
|
|
List of data items (capped at 1000)
|
|
"""
|
|
try:
|
|
port = self.get_instance_port(port)
|
|
except ValueError as e:
|
|
return {"error": str(e)}
|
|
|
|
config = get_config()
|
|
cap = config.resource_caps.get("data", 1000)
|
|
|
|
response = self.safe_get(port, "data", {"limit": cap})
|
|
simplified = self.simplify_response(response)
|
|
|
|
if not simplified.get("success", True):
|
|
return simplified
|
|
|
|
data_items = simplified.get("result", [])
|
|
if not isinstance(data_items, list):
|
|
data_items = []
|
|
|
|
return {
|
|
"data": data_items[:cap],
|
|
"count": len(data_items),
|
|
"capped_at": cap if len(data_items) >= cap else None,
|
|
"_hint": "Use data_list() tool for full pagination" if len(data_items) >= cap else None,
|
|
}
|