Ryan Malloy 0d25a0dc24 feat: Add symbols, segments, variables, namespaces mixins and search enhancements
New mixins wrapping existing Java HTTP endpoints:
- SymbolsMixin: symbols_list, symbols_imports, symbols_exports (+3 resources)
- SegmentsMixin: segments_list (+1 resource)
- VariablesMixin: variables_list, functions_variables (+1 resource)
- NamespacesMixin: namespaces_list, classes_list (+2 resources)

Additions to existing mixins:
- comments_get in AnalysisMixin (read complement to comments_set)
- program_info tool + resource in InstancesMixin

Search enhancements (Sprint 2):
- functions_list now passes name_contains, name_regex, addr to Java API
  for server-side filtering on large binaries

Brings tool count from 42 to 52 (excl. feedback), resources from 11 to 19.
2026-01-31 10:05:50 -07:00

200 lines
6.1 KiB
Python

"""Cross-references mixin for GhydraMCP.
Provides tools for cross-reference (xref) operations.
"""
from typing import Any, Dict, List, Optional
from fastmcp import Context
from fastmcp.contrib.mcp_mixin import mcp_resource, mcp_tool
from ..config import get_config
from .base import GhydraMixinBase
class XrefsMixin(GhydraMixinBase):
"""Mixin for cross-reference operations.
Provides tools for:
- Listing references to an address
- Listing references from an address
- Filtering by reference type
"""
@mcp_tool()
def xrefs_list(
self,
to_addr: Optional[str] = None,
from_addr: Optional[str] = None,
type: Optional[str] = None,
port: Optional[int] = None,
page_size: int = 50,
grep: Optional[str] = None,
grep_ignorecase: bool = True,
return_all: bool = False,
fields: Optional[List[str]] = None,
ctx: Optional[Context] = None,
) -> Dict[str, Any]:
"""List cross-references with filtering and pagination.
Args:
to_addr: Filter references to this address (hex)
from_addr: Filter references from this address (hex)
type: Filter by reference type ("CALL", "READ", "WRITE", etc.)
port: Ghidra instance port (optional)
page_size: Items per page (default: 50, max: 500)
grep: Regex pattern to filter results
grep_ignorecase: Case-insensitive grep (default: True)
return_all: Return all results without pagination
fields: Field names to keep (e.g. ['fromAddress', 'toAddress']). Reduces response size.
ctx: FastMCP context (auto-injected)
Returns:
Cross-references with pagination metadata
"""
if not to_addr and not from_addr:
return {
"success": False,
"error": {
"code": "MISSING_PARAMETER",
"message": "Either to_addr or from_addr parameter is required",
},
}
try:
port = self.get_instance_port(port)
except ValueError as e:
return {"success": False, "error": {"code": "NO_INSTANCE", "message": str(e)}}
config = get_config()
params = {"offset": 0, "limit": 10000}
if to_addr:
params["to_addr"] = to_addr
if from_addr:
params["from_addr"] = from_addr
if type:
params["type"] = type
response = self.safe_get(port, "xrefs", params)
simplified = self.simplify_response(response)
if not simplified.get("success", True):
return simplified
all_xrefs = simplified.get("result", [])
if not isinstance(all_xrefs, list):
all_xrefs = []
query_params = {
"tool": "xrefs_list",
"port": port,
"to_addr": to_addr,
"from_addr": from_addr,
"type": type,
"grep": grep,
}
session_id = self._get_session_id(ctx)
return self.filtered_paginate(
data=all_xrefs,
query_params=query_params,
tool_name="xrefs_list",
session_id=session_id,
page_size=min(page_size, config.max_page_size),
grep=grep,
grep_ignorecase=grep_ignorecase,
return_all=return_all,
fields=fields,
)
# Resources
@mcp_resource(uri="ghidra://instance/{port}/xrefs/to/{address}")
def resource_xrefs_to(
self,
port: Optional[int] = None,
address: Optional[str] = None,
) -> Dict[str, Any]:
"""MCP Resource: Get references to an address (capped).
Args:
port: Ghidra instance port
address: Target address
Returns:
References to the address (capped at 1000)
"""
if not address:
return {"error": "Address is required"}
try:
port = self.get_instance_port(port)
except ValueError as e:
return {"error": str(e)}
config = get_config()
cap = config.resource_caps.get("xrefs", 1000)
response = self.safe_get(port, "xrefs", {"to_addr": address, "limit": cap})
simplified = self.simplify_response(response)
if not simplified.get("success", True):
return simplified
xrefs = simplified.get("result", [])
if not isinstance(xrefs, list):
xrefs = []
return {
"address": address,
"xrefs_to": xrefs[:cap],
"count": len(xrefs),
"capped_at": cap if len(xrefs) >= cap else None,
"_hint": "Use xrefs_list(to_addr=...) for full pagination" if len(xrefs) >= cap else None,
}
@mcp_resource(uri="ghidra://instance/{port}/xrefs/from/{address}")
def resource_xrefs_from(
self,
port: Optional[int] = None,
address: Optional[str] = None,
) -> Dict[str, Any]:
"""MCP Resource: Get references from an address (capped).
Args:
port: Ghidra instance port
address: Source address
Returns:
References from the address (capped at 1000)
"""
if not address:
return {"error": "Address is required"}
try:
port = self.get_instance_port(port)
except ValueError as e:
return {"error": str(e)}
config = get_config()
cap = config.resource_caps.get("xrefs", 1000)
response = self.safe_get(port, "xrefs", {"from_addr": address, "limit": cap})
simplified = self.simplify_response(response)
if not simplified.get("success", True):
return simplified
xrefs = simplified.get("result", [])
if not isinstance(xrefs, list):
xrefs = []
return {
"address": address,
"xrefs_from": xrefs[:cap],
"count": len(xrefs),
"capped_at": cap if len(xrefs) >= cap else None,
"_hint": "Use xrefs_list(from_addr=...) for full pagination" if len(xrefs) >= cap else None,
}