Ryan Malloy 0d25a0dc24 feat: Add symbols, segments, variables, namespaces mixins and search enhancements
New mixins wrapping existing Java HTTP endpoints:
- SymbolsMixin: symbols_list, symbols_imports, symbols_exports (+3 resources)
- SegmentsMixin: segments_list (+1 resource)
- VariablesMixin: variables_list, functions_variables (+1 resource)
- NamespacesMixin: namespaces_list, classes_list (+2 resources)

Additions to existing mixins:
- comments_get in AnalysisMixin (read complement to comments_set)
- program_info tool + resource in InstancesMixin

Search enhancements (Sprint 2):
- functions_list now passes name_contains, name_regex, addr to Java API
  for server-side filtering on large binaries

Brings tool count from 42 to 52 (excl. feedback), resources from 11 to 19.
2026-01-31 10:05:50 -07:00

391 lines
12 KiB
Python

"""Data mixin for GhydraMCP.
Provides tools for data items and strings operations.
"""
from typing import Any, Dict, List, Optional
from fastmcp import Context
from fastmcp.contrib.mcp_mixin import mcp_resource, mcp_tool
from ..config import get_config
from .base import GhydraMixinBase
class DataMixin(GhydraMixinBase):
"""Mixin for data operations.
Provides tools for:
- Listing and searching data items
- Creating and modifying data
- Working with strings
- Setting data types
"""
@mcp_tool()
def data_list(
self,
addr: Optional[str] = None,
name: Optional[str] = None,
name_contains: Optional[str] = None,
type: Optional[str] = None,
port: Optional[int] = None,
page_size: int = 50,
grep: Optional[str] = None,
grep_ignorecase: bool = True,
return_all: bool = False,
fields: Optional[List[str]] = None,
ctx: Optional[Context] = None,
) -> Dict[str, Any]:
"""List defined data items with filtering and cursor-based pagination.
Args:
addr: Filter by address (hexadecimal)
name: Exact name match filter (case-sensitive)
name_contains: Substring name filter (case-insensitive)
type: Filter by data type (e.g. "string", "dword")
port: Ghidra instance port (optional)
page_size: Items per page (default: 50, max: 500)
grep: Regex pattern to filter results
grep_ignorecase: Case-insensitive grep (default: True)
return_all: Return all results without pagination
fields: Field names to keep (e.g. ['address', 'name']). Reduces response size.
ctx: FastMCP context (auto-injected)
Returns:
Data items with pagination metadata
"""
try:
port = self.get_instance_port(port)
except ValueError as e:
return {"success": False, "error": {"code": "NO_INSTANCE", "message": str(e)}}
config = get_config()
params = {"offset": 0, "limit": 10000}
if addr:
params["addr"] = addr
if name:
params["name"] = name
if name_contains:
params["name_contains"] = name_contains
if type:
params["type"] = type
response = self.safe_get(port, "data", params)
simplified = self.simplify_response(response)
if not simplified.get("success", True):
return simplified
all_data = simplified.get("result", [])
if not isinstance(all_data, list):
all_data = []
query_params = {
"tool": "data_list",
"port": port,
"addr": addr,
"name": name,
"name_contains": name_contains,
"type": type,
"grep": grep,
}
session_id = self._get_session_id(ctx)
return self.filtered_paginate(
data=all_data,
query_params=query_params,
tool_name="data_list",
session_id=session_id,
page_size=min(page_size, config.max_page_size),
grep=grep,
grep_ignorecase=grep_ignorecase,
return_all=return_all,
fields=fields,
)
@mcp_tool()
def data_list_strings(
self,
filter: Optional[str] = None,
port: Optional[int] = None,
page_size: int = 50,
grep: Optional[str] = None,
grep_ignorecase: bool = True,
return_all: bool = False,
fields: Optional[List[str]] = None,
ctx: Optional[Context] = None,
) -> Dict[str, Any]:
"""List all defined strings in the binary with pagination.
Args:
filter: Server-side string content filter
port: Ghidra instance port (optional)
page_size: Items per page (default: 50, max: 500)
grep: Regex pattern to filter results (e.g., "password|key")
grep_ignorecase: Case-insensitive grep (default: True)
return_all: Return all strings without pagination
fields: Field names to keep (e.g. ['value', 'address']). Reduces response size.
ctx: FastMCP context (auto-injected)
Returns:
List of strings with pagination info
"""
try:
port = self.get_instance_port(port)
except ValueError as e:
return {"success": False, "error": {"code": "NO_INSTANCE", "message": str(e)}}
config = get_config()
fetch_limit = 10000 if return_all else max(page_size * 10, 2000)
params = {"offset": 0, "limit": fetch_limit}
if filter:
params["filter"] = filter
response = self.safe_get(port, "strings", params)
simplified = self.simplify_response(response)
if not simplified.get("success", True):
return simplified
result_data = simplified.get("result", [])
if not isinstance(result_data, list):
return simplified
query_params = {
"tool": "data_list_strings",
"port": port,
"filter": filter,
"grep": grep,
}
session_id = self._get_session_id(ctx)
return self.filtered_paginate(
data=result_data,
query_params=query_params,
tool_name="data_list_strings",
session_id=session_id,
page_size=min(page_size, config.max_page_size),
grep=grep,
grep_ignorecase=grep_ignorecase,
return_all=return_all,
fields=fields,
)
@mcp_tool()
def data_create(
self,
address: str,
data_type: str,
size: Optional[int] = None,
port: Optional[int] = None,
) -> Dict[str, Any]:
"""Define a new data item at the specified address.
Args:
address: Memory address in hex format
data_type: Data type (e.g. "string", "dword", "byte")
size: Optional size in bytes
port: Ghidra instance port (optional)
Returns:
Operation result with created data information
"""
if not address or not data_type:
return {
"success": False,
"error": {
"code": "MISSING_PARAMETER",
"message": "Address and data_type parameters are required",
},
}
try:
port = self.get_instance_port(port)
except ValueError as e:
return {"success": False, "error": {"code": "NO_INSTANCE", "message": str(e)}}
payload = {"address": address, "type": data_type}
if size is not None:
payload["size"] = size
response = self.safe_post(port, "data", payload)
return self.simplify_response(response)
@mcp_tool()
def data_rename(
self,
address: str,
name: str,
port: Optional[int] = None,
) -> Dict[str, Any]:
"""Rename a data item.
Args:
address: Memory address in hex format
name: New name for the data item
port: Ghidra instance port (optional)
Returns:
Operation result
"""
if not address or not name:
return {
"success": False,
"error": {
"code": "MISSING_PARAMETER",
"message": "Address and name parameters are required",
},
}
try:
port = self.get_instance_port(port)
except ValueError as e:
return {"success": False, "error": {"code": "NO_INSTANCE", "message": str(e)}}
payload = {"address": address, "newName": name}
response = self.safe_post(port, "data", payload)
return self.simplify_response(response)
@mcp_tool()
def data_delete(
self,
address: str,
port: Optional[int] = None,
) -> Dict[str, Any]:
"""Delete data at the specified address.
Args:
address: Memory address in hex format
port: Ghidra instance port (optional)
Returns:
Operation result
"""
if not address:
return {
"success": False,
"error": {
"code": "MISSING_PARAMETER",
"message": "Address parameter is required",
},
}
try:
port = self.get_instance_port(port)
except ValueError as e:
return {"success": False, "error": {"code": "NO_INSTANCE", "message": str(e)}}
payload = {"address": address, "action": "delete"}
response = self.safe_post(port, "data/delete", payload)
return self.simplify_response(response)
@mcp_tool()
def data_set_type(
self,
address: str,
data_type: str,
port: Optional[int] = None,
) -> Dict[str, Any]:
"""Set the data type of a data item.
Args:
address: Memory address in hex format
data_type: Data type name (e.g. "uint32_t", "char[10]")
port: Ghidra instance port (optional)
Returns:
Operation result
"""
if not address or not data_type:
return {
"success": False,
"error": {
"code": "MISSING_PARAMETER",
"message": "Address and data_type parameters are required",
},
}
try:
port = self.get_instance_port(port)
except ValueError as e:
return {"success": False, "error": {"code": "NO_INSTANCE", "message": str(e)}}
payload = {"address": address, "type": data_type}
response = self.safe_post(port, "data/type", payload)
return self.simplify_response(response)
# Resources
@mcp_resource(uri="ghidra://instance/{port}/strings")
def resource_strings_list(self, port: Optional[int] = None) -> Dict[str, Any]:
"""MCP Resource: List strings (capped).
Args:
port: Ghidra instance port
Returns:
List of strings (capped at 1000)
"""
try:
port = self.get_instance_port(port)
except ValueError as e:
return {"error": str(e)}
config = get_config()
cap = config.resource_caps.get("strings", 1000)
response = self.safe_get(port, "strings", {"limit": cap})
simplified = self.simplify_response(response)
if not simplified.get("success", True):
return simplified
strings = simplified.get("result", [])
if not isinstance(strings, list):
strings = []
return {
"strings": strings[:cap],
"count": len(strings),
"capped_at": cap if len(strings) >= cap else None,
"_hint": "Use data_list_strings() tool for full pagination" if len(strings) >= cap else None,
}
@mcp_resource(uri="ghidra://instance/{port}/data")
def resource_data_list(self, port: Optional[int] = None) -> Dict[str, Any]:
"""MCP Resource: List data items (capped).
Args:
port: Ghidra instance port
Returns:
List of data items (capped at 1000)
"""
try:
port = self.get_instance_port(port)
except ValueError as e:
return {"error": str(e)}
config = get_config()
cap = config.resource_caps.get("data", 1000)
response = self.safe_get(port, "data", {"limit": cap})
simplified = self.simplify_response(response)
if not simplified.get("success", True):
return simplified
data_items = simplified.get("result", [])
if not isinstance(data_items, list):
data_items = []
return {
"data": data_items[:cap],
"count": len(data_items),
"capped_at": cap if len(data_items) >= cap else None,
"_hint": "Use data_list() tool for full pagination" if len(data_items) >= cap else None,
}