Ryan Malloy 28b81ff359
Some checks are pending
Build Ghidra Plugin / build (push) Waiting to run
feat: Add Python MCP bridge and build tooling
- Add ghydramcp Python package with FastMCP server implementation
- Add docker-compose.yml for easy container management
- Add Makefile with build/run targets
- Add QUICKSTART.md for getting started
- Add uv.lock for reproducible dependencies
2026-01-26 13:51:12 -07:00

357 lines
11 KiB
Python

"""Analysis mixin for GhydraMCP.
Provides tools for program analysis operations.
"""
from typing import Any, Dict, Optional
from fastmcp import Context
from fastmcp.contrib.mcp_mixin import mcp_tool
from .base import GhydraMixinBase
from ..config import get_config
class AnalysisMixin(GhydraMixinBase):
"""Mixin for analysis operations.
Provides tools for:
- Running program analysis
- Call graph analysis
- Data flow analysis
- UI state queries
- Comment management
"""
@mcp_tool()
def analysis_run(
self,
port: Optional[int] = None,
analysis_options: Optional[Dict[str, Any]] = None,
) -> Dict[str, Any]:
"""Run analysis on the current program.
Args:
port: Ghidra instance port (optional)
analysis_options: Analysis options to enable/disable
Returns:
Analysis operation result
"""
try:
port = self.get_instance_port(port)
except ValueError as e:
return {"success": False, "error": {"code": "NO_INSTANCE", "message": str(e)}}
response = self.safe_post(port, "analysis", analysis_options or {})
return self.simplify_response(response)
@mcp_tool()
def analysis_get_callgraph(
self,
name: Optional[str] = None,
address: Optional[str] = None,
max_depth: int = 3,
port: Optional[int] = None,
page_size: int = 50,
grep: Optional[str] = None,
grep_ignorecase: bool = True,
return_all: bool = False,
ctx: Optional[Context] = None,
) -> Dict[str, Any]:
"""Get function call graph with edge pagination.
Args:
name: Starting function name (mutually exclusive with address)
address: Starting function address
max_depth: Maximum call depth (default: 3)
port: Ghidra instance port (optional)
page_size: Edges per page (default: 50, max: 500)
grep: Regex pattern to filter edges
grep_ignorecase: Case-insensitive grep (default: True)
return_all: Return all edges without pagination
ctx: FastMCP context (auto-injected)
Returns:
Call graph with paginated edges
"""
try:
port = self.get_instance_port(port)
except ValueError as e:
return {"success": False, "error": {"code": "NO_INSTANCE", "message": str(e)}}
config = get_config()
params = {"max_depth": max_depth}
if address:
params["address"] = address
func_id = address
elif name:
params["name"] = name
func_id = name
else:
func_id = "entry_point"
response = self.safe_get(port, "analysis/callgraph", params)
simplified = self.simplify_response(response)
if not simplified.get("success", True):
return simplified
result = simplified.get("result", {})
edges = result.get("edges", []) if isinstance(result, dict) else []
nodes = result.get("nodes", []) if isinstance(result, dict) else []
if not edges:
return simplified
query_params = {
"tool": "analysis_get_callgraph",
"port": port,
"name": name,
"address": address,
"max_depth": max_depth,
"grep": grep,
}
session_id = self._get_session_id(ctx)
paginated = self.paginate_response(
data=edges,
query_params=query_params,
tool_name="analysis_get_callgraph",
session_id=session_id,
page_size=min(page_size, config.max_page_size),
grep=grep,
grep_ignorecase=grep_ignorecase,
return_all=return_all,
)
if paginated.get("success"):
paginated["result"] = {
"root_function": func_id,
"max_depth": max_depth,
"nodes": nodes,
"edges": paginated.get("result", []),
"total_nodes": len(nodes),
}
return paginated
@mcp_tool()
def analysis_get_dataflow(
self,
address: str,
direction: str = "forward",
max_steps: int = 50,
port: Optional[int] = None,
page_size: int = 50,
grep: Optional[str] = None,
grep_ignorecase: bool = True,
return_all: bool = False,
ctx: Optional[Context] = None,
) -> Dict[str, Any]:
"""Perform data flow analysis with step pagination.
Args:
address: Starting address in hex format
direction: "forward" or "backward" (default: "forward")
max_steps: Maximum analysis steps (default: 50)
port: Ghidra instance port (optional)
page_size: Steps per page (default: 50, max: 500)
grep: Regex pattern to filter steps
grep_ignorecase: Case-insensitive grep (default: True)
return_all: Return all steps without pagination
ctx: FastMCP context (auto-injected)
Returns:
Data flow steps with pagination
"""
if not address:
return {
"success": False,
"error": {
"code": "MISSING_PARAMETER",
"message": "Address parameter is required",
},
}
try:
port = self.get_instance_port(port)
except ValueError as e:
return {"success": False, "error": {"code": "NO_INSTANCE", "message": str(e)}}
config = get_config()
params = {
"address": address,
"direction": direction,
"max_steps": max_steps,
}
response = self.safe_get(port, "analysis/dataflow", params)
simplified = self.simplify_response(response)
if not simplified.get("success", True):
return simplified
result = simplified.get("result", {})
steps = result.get("steps", []) if isinstance(result, dict) else []
if not steps:
return simplified
query_params = {
"tool": "analysis_get_dataflow",
"port": port,
"address": address,
"direction": direction,
"max_steps": max_steps,
"grep": grep,
}
session_id = self._get_session_id(ctx)
paginated = self.paginate_response(
data=steps,
query_params=query_params,
tool_name="analysis_get_dataflow",
session_id=session_id,
page_size=min(page_size, config.max_page_size),
grep=grep,
grep_ignorecase=grep_ignorecase,
return_all=return_all,
)
if paginated.get("success"):
paginated["result"] = {
"start_address": address,
"direction": direction,
"steps": paginated.get("result", []),
}
if isinstance(result, dict):
for key in ["sources", "sinks", "total_steps"]:
if key in result:
paginated["result"][key] = result[key]
return paginated
@mcp_tool()
def ui_get_current_address(self, port: Optional[int] = None) -> Dict[str, Any]:
"""Get the address currently selected in Ghidra's UI.
Args:
port: Ghidra instance port (optional)
Returns:
Current address information
"""
try:
port = self.get_instance_port(port)
except ValueError as e:
return {"success": False, "error": {"code": "NO_INSTANCE", "message": str(e)}}
response = self.safe_get(port, "address")
return self.simplify_response(response)
@mcp_tool()
def ui_get_current_function(self, port: Optional[int] = None) -> Dict[str, Any]:
"""Get the function currently selected in Ghidra's UI.
Args:
port: Ghidra instance port (optional)
Returns:
Current function information
"""
try:
port = self.get_instance_port(port)
except ValueError as e:
return {"success": False, "error": {"code": "NO_INSTANCE", "message": str(e)}}
response = self.safe_get(port, "function")
return self.simplify_response(response)
@mcp_tool()
def comments_set(
self,
address: str,
comment: str = "",
comment_type: str = "plate",
port: Optional[int] = None,
) -> Dict[str, Any]:
"""Set a comment at the specified address.
Args:
address: Memory address in hex format
comment: Comment text (empty string removes comment)
comment_type: "plate", "pre", "post", "eol", "repeatable"
port: Ghidra instance port (optional)
Returns:
Operation result
"""
if not address:
return {
"success": False,
"error": {
"code": "MISSING_PARAMETER",
"message": "Address parameter is required",
},
}
try:
port = self.get_instance_port(port)
except ValueError as e:
return {"success": False, "error": {"code": "NO_INSTANCE", "message": str(e)}}
payload = {"comment": comment}
response = self.safe_post(port, f"memory/{address}/comments/{comment_type}", payload)
return self.simplify_response(response)
@mcp_tool()
def functions_set_comment(
self,
address: str,
comment: str = "",
port: Optional[int] = None,
) -> Dict[str, Any]:
"""Set a decompiler-friendly comment (function comment with fallback).
Args:
address: Memory address (preferably function entry point)
comment: Comment text (empty string removes comment)
port: Ghidra instance port (optional)
Returns:
Operation result
"""
if not address:
return {
"success": False,
"error": {
"code": "MISSING_PARAMETER",
"message": "Address parameter is required",
},
}
try:
port = self.get_instance_port(port)
except ValueError as e:
return {"success": False, "error": {"code": "NO_INSTANCE", "message": str(e)}}
# Try setting as function comment first
try:
payload = {"comment": comment}
response = self.safe_patch(port, f"functions/{address}", payload)
if response.get("success", False):
return self.simplify_response(response)
except Exception:
pass
# Fallback to pre-comment
return self.comments_set(
address=address,
comment=comment,
comment_type="pre",
port=port,
)