"""Analysis mixin for GhydraMCP. Provides tools for program analysis operations. """ from typing import Any, Dict, List, Optional from fastmcp import Context from fastmcp.contrib.mcp_mixin import mcp_tool from ..config import get_config from ..core.logging import logger from .base import GhydraMixinBase class AnalysisMixin(GhydraMixinBase): """Mixin for analysis operations. Provides tools for: - Running program analysis - Call graph analysis - Data flow analysis - UI state queries - Comment management """ @mcp_tool() def analysis_run( self, port: Optional[int] = None, analysis_options: Optional[Dict[str, Any]] = None, ) -> Dict[str, Any]: """Run analysis on the current program. Args: port: Ghidra instance port (optional) analysis_options: Analysis options to enable/disable Returns: Analysis operation result """ try: port = self.get_instance_port(port) except ValueError as e: return {"success": False, "error": {"code": "NO_INSTANCE", "message": str(e)}} response = self.safe_post(port, "analysis", analysis_options or {}) return self.simplify_response(response) @mcp_tool() def analysis_get_callgraph( self, name: Optional[str] = None, address: Optional[str] = None, max_depth: int = 3, port: Optional[int] = None, page_size: int = 50, grep: Optional[str] = None, grep_ignorecase: bool = True, return_all: bool = False, fields: Optional[List[str]] = None, ctx: Optional[Context] = None, ) -> Dict[str, Any]: """Get function call graph with edge pagination. Args: name: Starting function name (mutually exclusive with address) address: Starting function address max_depth: Maximum call depth (default: 3) port: Ghidra instance port (optional) page_size: Edges per page (default: 50, max: 500) grep: Regex pattern to filter edges grep_ignorecase: Case-insensitive grep (default: True) return_all: Return all edges without pagination fields: Field names to keep per edge (e.g. ['from', 'to']). Reduces response size. ctx: FastMCP context (auto-injected) Returns: Call graph with paginated edges """ try: port = self.get_instance_port(port) except ValueError as e: return {"success": False, "error": {"code": "NO_INSTANCE", "message": str(e)}} config = get_config() params = {"max_depth": max_depth} if address: params["address"] = address func_id = address elif name: params["name"] = name func_id = name else: func_id = "entry_point" response = self.safe_get(port, "analysis/callgraph", params) simplified = self.simplify_response(response) if not simplified.get("success", True): return simplified result = simplified.get("result", {}) edges = result.get("edges", []) if isinstance(result, dict) else [] nodes = result.get("nodes", []) if isinstance(result, dict) else [] if not edges: return simplified query_params = { "tool": "analysis_get_callgraph", "port": port, "name": name, "address": address, "max_depth": max_depth, "grep": grep, } session_id = self._get_session_id(ctx) paginated = self.filtered_paginate( data=edges, query_params=query_params, tool_name="analysis_get_callgraph", session_id=session_id, page_size=min(page_size, config.max_page_size), grep=grep, grep_ignorecase=grep_ignorecase, return_all=return_all, fields=fields, ) if paginated.get("success") and not paginated.get("guarded"): paginated["result"] = { "root_function": func_id, "max_depth": max_depth, "nodes": nodes, "edges": paginated.get("result", []), "total_nodes": len(nodes), } return paginated @mcp_tool() def analysis_get_dataflow( self, address: str, direction: str = "forward", max_steps: int = 50, port: Optional[int] = None, page_size: int = 50, grep: Optional[str] = None, grep_ignorecase: bool = True, return_all: bool = False, fields: Optional[List[str]] = None, ctx: Optional[Context] = None, ) -> Dict[str, Any]: """Perform data flow analysis with step pagination. Args: address: Starting address in hex format direction: "forward" or "backward" (default: "forward") max_steps: Maximum analysis steps (default: 50) port: Ghidra instance port (optional) page_size: Steps per page (default: 50, max: 500) grep: Regex pattern to filter steps grep_ignorecase: Case-insensitive grep (default: True) return_all: Return all steps without pagination fields: Field names to keep per step. Reduces response size. ctx: FastMCP context (auto-injected) Returns: Data flow steps with pagination """ if not address: return { "success": False, "error": { "code": "MISSING_PARAMETER", "message": "Address parameter is required", }, } try: port = self.get_instance_port(port) except ValueError as e: return {"success": False, "error": {"code": "NO_INSTANCE", "message": str(e)}} config = get_config() params = { "address": address, "direction": direction, "max_steps": max_steps, } response = self.safe_get(port, "analysis/dataflow", params) simplified = self.simplify_response(response) if not simplified.get("success", True): return simplified result = simplified.get("result", {}) steps = result.get("steps", []) if isinstance(result, dict) else [] if not steps: return simplified query_params = { "tool": "analysis_get_dataflow", "port": port, "address": address, "direction": direction, "max_steps": max_steps, "grep": grep, } session_id = self._get_session_id(ctx) paginated = self.filtered_paginate( data=steps, query_params=query_params, tool_name="analysis_get_dataflow", session_id=session_id, page_size=min(page_size, config.max_page_size), grep=grep, grep_ignorecase=grep_ignorecase, return_all=return_all, fields=fields, ) # Merge metadata into result (skip if guarded) if paginated.get("success") and not paginated.get("guarded"): paginated["result"] = { "start_address": address, "direction": direction, "steps": paginated.get("result", []), } if isinstance(result, dict): for key in ["sources", "sinks", "total_steps"]: if key in result: paginated["result"][key] = result[key] return paginated @mcp_tool() def ui_get_current_address(self, port: Optional[int] = None) -> Dict[str, Any]: """Get the address currently selected in Ghidra's UI. Args: port: Ghidra instance port (optional) Returns: Current address information """ try: port = self.get_instance_port(port) except ValueError as e: return {"success": False, "error": {"code": "NO_INSTANCE", "message": str(e)}} response = self.safe_get(port, "address") return self.simplify_response(response) @mcp_tool() def ui_get_current_function(self, port: Optional[int] = None) -> Dict[str, Any]: """Get the function currently selected in Ghidra's UI. Args: port: Ghidra instance port (optional) Returns: Current function information """ try: port = self.get_instance_port(port) except ValueError as e: return {"success": False, "error": {"code": "NO_INSTANCE", "message": str(e)}} response = self.safe_get(port, "function") return self.simplify_response(response) @mcp_tool() def comments_get( self, address: str, comment_type: str = "plate", port: Optional[int] = None, ) -> Dict[str, Any]: """Get a comment at the specified address. Args: address: Memory address in hex format comment_type: "plate", "pre", "post", "eol", "repeatable" port: Ghidra instance port (optional) Returns: Comment text and metadata """ if not address: return { "success": False, "error": { "code": "MISSING_PARAMETER", "message": "Address parameter is required", }, } try: port = self.get_instance_port(port) except ValueError as e: return {"success": False, "error": {"code": "NO_INSTANCE", "message": str(e)}} response = self.safe_get(port, f"memory/{address}/comments/{comment_type}") return self.simplify_response(response) @mcp_tool() def comments_set( self, address: str, comment: str = "", comment_type: str = "plate", port: Optional[int] = None, ) -> Dict[str, Any]: """Set a comment at the specified address. Args: address: Memory address in hex format comment: Comment text (empty string removes comment) comment_type: "plate", "pre", "post", "eol", "repeatable" port: Ghidra instance port (optional) Returns: Operation result """ if not address: return { "success": False, "error": { "code": "MISSING_PARAMETER", "message": "Address parameter is required", }, } try: port = self.get_instance_port(port) except ValueError as e: return {"success": False, "error": {"code": "NO_INSTANCE", "message": str(e)}} payload = {"comment": comment} response = self.safe_post(port, f"memory/{address}/comments/{comment_type}", payload) return self.simplify_response(response) @mcp_tool() def functions_set_comment( self, address: str, comment: str = "", port: Optional[int] = None, ) -> Dict[str, Any]: """Set a decompiler-friendly comment (function comment with fallback). Args: address: Memory address (preferably function entry point) comment: Comment text (empty string removes comment) port: Ghidra instance port (optional) Returns: Operation result """ if not address: return { "success": False, "error": { "code": "MISSING_PARAMETER", "message": "Address parameter is required", }, } try: port = self.get_instance_port(port) except ValueError as e: return {"success": False, "error": {"code": "NO_INSTANCE", "message": str(e)}} # Try setting as function comment first payload = {"comment": comment} response = self.safe_patch(port, f"functions/{address}", payload) if response.get("success", False): return self.simplify_response(response) # Log why function comment failed before falling back error = response.get("error", {}) logger.debug( "Function comment at %s failed (%s), falling back to pre-comment", address, error.get("code", "UNKNOWN"), ) # Fallback to pre-comment return self.comments_set( address=address, comment=comment, comment_type="pre", port=port, )