From 0f9aa2bb47b84d2874062af5a31662a9c2809f42 Mon Sep 17 00:00:00 2001 From: Teal Bauer Date: Tue, 15 Apr 2025 09:02:58 +0200 Subject: [PATCH] feat: Refactor bridge for better MCP integration Implemented the refactoring proposal to optimize the bridge for better MCP integration: - Added resources for context loading (instances, functions, disassembly) - Added prompts for common analysis patterns - Reorganized tools into namespaced functions for better discoverability - Implemented current working instance concept for simpler usage - Added documentation for the namespaces-based approach --- bridge_mcp_hydra.py | 2582 ++++++++++++++++--------------------- refactoring_namespaces.py | 1621 +++++++++++++++++++++++ refactoring_proposal.md | 261 ++++ refactoring_sample.py | 1286 ++++++++++++++++++ 4 files changed, 4289 insertions(+), 1461 deletions(-) create mode 100644 refactoring_namespaces.py create mode 100644 refactoring_proposal.md create mode 100644 refactoring_sample.py diff --git a/bridge_mcp_hydra.py b/bridge_mcp_hydra.py index 8d428bb..d301ca9 100644 --- a/bridge_mcp_hydra.py +++ b/bridge_mcp_hydra.py @@ -5,8 +5,8 @@ # "requests==2.32.3", # ] # /// -# GhydraMCP Bridge for Ghidra HATEOAS API -# This script implements the MCP_BRIDGE_API.md specification +# GhydraMCP Bridge for Ghidra HATEOAS API - Refactored for MCP optimization +# This provides a revised implementation with namespaced tools import os import signal import sys @@ -14,12 +14,13 @@ import threading import time from threading import Lock from typing import Dict, List, Optional, Union, Any -from urllib.parse import quote, urlencode -from urllib.parse import urlparse +from urllib.parse import quote, urlencode, urlparse import requests from mcp.server.fastmcp import FastMCP +# ================= Core Infrastructure ================= + # Allowed origins for CORS/CSRF protection ALLOWED_ORIGINS = os.environ.get( "GHIDRA_ALLOWED_ORIGINS", "http://localhost").split(",") @@ -34,20 +35,43 @@ QUICK_DISCOVERY_RANGE = range(DEFAULT_GHIDRA_PORT, DEFAULT_GHIDRA_PORT+10) FULL_DISCOVERY_RANGE = range(DEFAULT_GHIDRA_PORT, DEFAULT_GHIDRA_PORT+20) # Version information -BRIDGE_VERSION = "v2.0.0-beta.3" -REQUIRED_API_VERSION = 2003 +BRIDGE_VERSION = "v2.0.0-beta.1" +REQUIRED_API_VERSION = 2 + +# Global state for the current instance +current_instance_port = DEFAULT_GHIDRA_PORT instructions = """ GhydraMCP allows interacting with multiple Ghidra SRE instances. Ghidra SRE is a tool for reverse engineering and analyzing binaries, e.g. malware. -First, run `discover_instances` to find open Ghidra instances. List tools to see what GhydraMCP can do. +First, run `instances_discover()` to find open Ghidra instances. Then use `instances_use(port)` to set your working instance. + +The API is organized into namespaces for different types of operations: +- instances_* : For managing Ghidra instances +- functions_* : For working with functions +- data_* : For working with data items +- memory_* : For memory access +- xrefs_* : For cross-references +- analysis_* : For program analysis """ mcp = FastMCP("GhydraMCP", version=BRIDGE_VERSION, instructions=instructions) ghidra_host = os.environ.get("GHIDRA_HYDRA_HOST", DEFAULT_GHIDRA_HOST) +# Helper function to get the current instance or validate a specific port +def _get_instance_port(port=None): + """Internal helper to get the current instance port or validate a specific port""" + port = port or current_instance_port + # Validate that the instance exists and is active + if port not in active_instances: + # Try to register it if not found + register_instance(port) + if port not in active_instances: + raise ValueError(f"No active Ghidra instance on port {port}") + return port +# The rest of the utility functions (HTTP helpers, etc.) remain the same... def get_instance_url(port: int) -> str: """Get URL for a Ghidra instance by port""" with instances_lock: @@ -61,7 +85,6 @@ def get_instance_url(port: int) -> str: return f"http://{ghidra_host}:{port}" - def validate_origin(headers: dict) -> bool: """Validate request origin against allowed origins""" origin = headers.get("Origin") @@ -80,7 +103,6 @@ def validate_origin(headers: dict) -> bool: return origin_base in ALLOWED_ORIGINS - def _make_request(method: str, port: int, endpoint: str, params: dict = None, json_data: dict = None, data: str = None, headers: dict = None) -> dict: @@ -201,18 +223,15 @@ def _make_request(method: str, port: int, endpoint: str, params: dict = None, "timestamp": int(time.time() * 1000) } - def safe_get(port: int, endpoint: str, params: dict = None) -> dict: """Make GET request to Ghidra instance""" return _make_request("GET", port, endpoint, params=params) - def safe_put(port: int, endpoint: str, data: dict) -> dict: """Make PUT request to Ghidra instance with JSON payload""" headers = data.pop("headers", None) if isinstance(data, dict) else None return _make_request("PUT", port, endpoint, json_data=data, headers=headers) - def safe_post(port: int, endpoint: str, data: Union[dict, str]) -> dict: """Perform a POST request to a specific Ghidra instance with JSON or text payload""" headers = None @@ -227,18 +246,15 @@ def safe_post(port: int, endpoint: str, data: Union[dict, str]) -> dict: return _make_request("POST", port, endpoint, json_data=json_payload, data=text_payload, headers=headers) - def safe_patch(port: int, endpoint: str, data: dict) -> dict: """Perform a PATCH request to a specific Ghidra instance with JSON payload""" headers = data.pop("headers", None) if isinstance(data, dict) else None return _make_request("PATCH", port, endpoint, json_data=data, headers=headers) - def safe_delete(port: int, endpoint: str) -> dict: """Perform a DELETE request to a specific Ghidra instance""" return _make_request("DELETE", port, endpoint) - def simplify_response(response: dict) -> dict: """ Simplify HATEOAS response data for easier AI agent consumption @@ -340,31 +356,6 @@ def simplify_response(response: dict) -> dict: return result - -# Instance management tools - -@mcp.tool() -def list_instances() -> dict: - """List all active Ghidra instances - - Returns: - dict: Contains 'instances' list with port, url, project and file info for each instance - """ - with instances_lock: - return { - "instances": [ - { - "port": port, - "url": info["url"], - "project": info.get("project", ""), - "file": info.get("file", "") - } - for port, info in active_instances.items() - ] - } - - -@mcp.tool() def register_instance(port: int, url: str = None) -> str: """Register a new Ghidra instance @@ -458,37 +449,6 @@ def register_instance(port: int, url: str = None) -> str: except Exception as e: return f"Error: Could not connect to instance at {url}: {str(e)}" - -@mcp.tool() -def unregister_instance(port: int) -> str: - """Unregister a Ghidra instance - - Args: - port: Port number of the instance to unregister - - Returns: - str: Confirmation message or error - """ - with instances_lock: - if port in active_instances: - del active_instances[port] - return f"Unregistered instance on port {port}" - return f"No instance found on port {port}" - - -@mcp.tool() -def discover_instances(host: str = None) -> dict: - """Discover available Ghidra instances by scanning ports - - Args: - host: Optional host to scan (default: configured ghidra_host) - - Returns: - dict: Contains 'found' count and 'instances' list with discovery results - """ - return _discover_instances(QUICK_DISCOVERY_RANGE, host=host, timeout=0.5) - - def _discover_instances(port_range, host=None, timeout=0.5) -> dict: """Internal function to discover Ghidra instances by scanning ports""" found_instances = [] @@ -552,1395 +512,6 @@ def _discover_instances(port_range, host=None, timeout=0.5) -> dict: "instances": found_instances } - -@mcp.tool() -def get_plugin_version(port: int = DEFAULT_GHIDRA_PORT) -> dict: - """Get version information for the Ghidra plugin - - Args: - port: Ghidra instance port (default: 8192) - - Returns: - dict: Plugin and API version information - """ - response = safe_get(port, "plugin-version") - return simplify_response(response) - - -@mcp.tool() -def get_program_info(port: int = DEFAULT_GHIDRA_PORT) -> dict: - """Get detailed information about the current program - - Args: - port: Ghidra instance port (default: 8192) - - Returns: - dict: Contains metadata about the current program including name, - architecture, memory layout, compiler, etc. - """ - response = safe_get(port, "program") - return simplify_response(response) - - -@mcp.tool() -def list_functions(port: int = DEFAULT_GHIDRA_PORT, - offset: int = 0, - limit: int = 100, - addr: str = None, - name: str = None, - name_contains: str = None, - name_matches_regex: str = None) -> dict: - """List functions in the current program with filtering and pagination - - Args: - port: Ghidra instance port (default: 8192) - offset: Pagination offset (default: 0) - limit: Maximum items to return (default: 100) - addr: Filter by address (hexadecimal) - name: Exact name match filter (case-sensitive) - name_contains: Substring name filter (case-insensitive) - name_matches_regex: Regex name filter - - Returns: - dict: { - "result": list of function info objects, - "size": total count, - "offset": current offset, - "limit": current limit - } - """ - params = { - "offset": offset, - "limit": limit - } - if addr: - params["addr"] = addr - if name: - params["name"] = name - if name_contains: - params["name_contains"] = name_contains - if name_matches_regex: - params["name_matches_regex"] = name_matches_regex - - response = safe_get(port, "functions", params) - simplified = simplify_response(response) - - # Ensure we maintain pagination metadata - if isinstance(simplified, dict) and "error" not in simplified: - simplified.setdefault("size", len(simplified.get("result", []))) - simplified.setdefault("offset", offset) - simplified.setdefault("limit", limit) - - return simplified - - -@mcp.tool() -def get_function(port: int = DEFAULT_GHIDRA_PORT, - address: str = None, - name: str = None) -> dict: - """Get details for a function by address or name - - Args: - port: Ghidra instance port (default: 8192) - address: Function address in hex format (mutually exclusive with name) - name: Function name (mutually exclusive with address) - - Returns: - dict: Contains function name, address, signature and other details - """ - if not address and not name: - return { - "success": False, - "error": "Either address or name parameter is required", - "timestamp": int(time.time() * 1000) - } - - if address: - endpoint = f"functions/{address}" - else: - endpoint = f"functions/by-name/{quote(name)}" - - response = safe_get(port, endpoint) - return simplify_response(response) - - -@mcp.tool() -def decompile_function(port: int = DEFAULT_GHIDRA_PORT, - address: str = None, - name: str = None, - syntax_tree: bool = False, - style: str = "normalize") -> dict: - """Get decompiled code for a function by address or name - - Args: - port: Ghidra instance port (default: 8192) - address: Function address in hex format (mutually exclusive with name) - name: Function name (mutually exclusive with address) - syntax_tree: Include syntax tree (default: False) - style: Decompiler style (default: "normalize") - - Returns: - dict: Contains function information and decompiled code - """ - if not address and not name: - return { - "success": False, - "error": "Either address or name parameter is required", - "timestamp": int(time.time() * 1000) - } - - params = { - "syntax_tree": str(syntax_tree).lower(), - "style": style - } - - if address: - endpoint = f"functions/{address}/decompile" - else: - endpoint = f"functions/by-name/{quote(name)}/decompile" - - response = safe_get(port, endpoint, params) - simplified = simplify_response(response) - - # For AI consumption, make the decompiled code more directly accessible - if "result" in simplified and isinstance(simplified["result"], dict): - if "decompiled" in simplified["result"]: - simplified["decompiled_code"] = simplified["result"]["decompiled"] - elif "ccode" in simplified["result"]: - simplified["decompiled_code"] = simplified["result"]["ccode"] - elif "decompiled_text" in simplified["result"]: - simplified["decompiled_code"] = simplified["result"]["decompiled_text"] - - return simplified - - -@mcp.tool() -def disassemble_function(port: int = DEFAULT_GHIDRA_PORT, - address: str = None, - name: str = None) -> dict: - """Get disassembly for a function by address or name - - Args: - port: Ghidra instance port (default: 8192) - address: Function address in hex format (mutually exclusive with name) - name: Function name (mutually exclusive with address) - - Returns: - dict: Contains function information and disassembly text, optimized for agent consumption - """ - if not address and not name: - return { - "success": False, - "error": { - "code": "MISSING_PARAMETER", - "message": "Either address or name parameter is required" - }, - "timestamp": int(time.time() * 1000) - } - - if address: - endpoint = f"functions/{address}/disassembly" - else: - endpoint = f"functions/by-name/{quote(name)}/disassembly" - - response = safe_get(port, endpoint) - simplified = simplify_response(response) - - # For AI consumption, create a simplified response with just the disassembly text - if "result" in simplified and isinstance(simplified["result"], dict): - result = simplified["result"] - function_info = None - disasm_text = None - - # Extract function info if available - if "function" in result and isinstance(result["function"], dict): - function_info = result["function"] - - # Get the disassembly text, generate it if it doesn't exist - if "disassembly_text" in result: - disasm_text = result["disassembly_text"] - elif "instructions" in result and isinstance(result["instructions"], list): - instr_list = result["instructions"] - disasm_text = "" - for instr in instr_list: - if isinstance(instr, dict): - addr = instr.get("address", "") - mnemonic = instr.get("mnemonic", "") - operands = instr.get("operands", "") - bytes_str = instr.get("bytes", "") - - # Format: address: bytes mnemonic operands - disasm_text += f"{addr}: {bytes_str.ljust(10)} {mnemonic} {operands}\n" - - # Create a simplified result that's easier for agents to consume - if disasm_text: - # Create a new response with just the important info - new_response = { - "success": True, - "id": simplified.get("id", ""), - "instance": simplified.get("instance", ""), - "timestamp": simplified.get("timestamp", int(time.time() * 1000)), - "disassembly": disasm_text # Direct access to disassembly text - } - - # Add function info if available - if function_info: - new_response["function_name"] = function_info.get("name", "") - new_response["function_address"] = function_info.get("address", "") - if "signature" in function_info: - new_response["function_signature"] = function_info.get("signature", "") - - # Preserve API links if available - if "api_links" in simplified: - new_response["api_links"] = simplified["api_links"] - - return new_response - - # If we couldn't extract disassembly text, return the original response - return simplified - - -@mcp.tool() -def get_function_variables(port: int = DEFAULT_GHIDRA_PORT, - address: str = None, - name: str = None) -> dict: - """Get variables for a function by address or name - - Args: - port: Ghidra instance port (default: 8192) - address: Function address in hex format (mutually exclusive with name) - name: Function name (mutually exclusive with address) - - Returns: - dict: Contains function information and list of variables - """ - if not address and not name: - return { - "success": False, - "error": { - "code": "MISSING_PARAMETER", - "message": "Either address or name parameter is required" - }, - "timestamp": int(time.time() * 1000) - } - - if address: - endpoint = f"functions/{address}/variables" - else: - endpoint = f"functions/by-name/{quote(name)}/variables" - - response = safe_get(port, endpoint) - return simplify_response(response) - - -@mcp.tool() -def list_segments(port: int = DEFAULT_GHIDRA_PORT, - offset: int = 0, - limit: int = 100, - name: str = None) -> dict: - """List memory segments with filtering and pagination - - Args: - port: Ghidra instance port (default: 8192) - offset: Pagination offset (default: 0) - limit: Maximum items to return (default: 100) - name: Filter by segment name (case-sensitive substring match) - - Returns: - dict: { - "result": list of segment objects with properties including name, start, end, size, - permissions (readable, writable, executable), and initialized status, - "size": total count of segments matching the filter, - "offset": current offset in pagination, - "limit": current limit for pagination - } - """ - params = { - "offset": offset, - "limit": limit - } - if name: - params["name"] = name - - response = safe_get(port, "segments", params) - simplified = simplify_response(response) - - # Ensure we maintain pagination metadata - if isinstance(simplified, dict) and "error" not in simplified: - simplified.setdefault("size", len(simplified.get("result", []))) - simplified.setdefault("offset", offset) - simplified.setdefault("limit", limit) - - return simplified - - -@mcp.tool() -def list_symbols(port: int = DEFAULT_GHIDRA_PORT, - offset: int = 0, - limit: int = 100, - addr: str = None, - name: str = None, - name_contains: str = None, - type: str = None) -> dict: - """List symbols with filtering and pagination - - Args: - port: Ghidra instance port (default: 8192) - offset: Pagination offset (default: 0) - limit: Maximum items to return (default: 100) - addr: Filter by address (hexadecimal) - name: Exact name match filter (case-sensitive) - name_contains: Substring name filter (case-insensitive) - type: Filter by symbol type (e.g. "function", "data", "label") - - Returns: - dict: { - "result": list of symbol objects, - "size": total count, - "offset": current offset, - "limit": current limit - } - """ - params = { - "offset": offset, - "limit": limit - } - if addr: - params["addr"] = addr - if name: - params["name"] = name - if name_contains: - params["name_contains"] = name_contains - if type: - params["type"] = type - - response = safe_get(port, "symbols", params) - simplified = simplify_response(response) - - # Ensure we maintain pagination metadata - if isinstance(simplified, dict) and "error" not in simplified: - simplified.setdefault("size", len(simplified.get("result", []))) - simplified.setdefault("offset", offset) - simplified.setdefault("limit", limit) - - return simplified - - -@mcp.tool() -def list_variables(port: int = DEFAULT_GHIDRA_PORT, - offset: int = 0, - limit: int = 100, - search: str = None, - global_only: bool = False) -> dict: - """List all variables in the program with pagination - - Args: - port: Ghidra instance port (default: 8192) - offset: Pagination offset (default: 0) - limit: Maximum items to return (default: 100) - search: Optional search term to filter variables by name - global_only: If True, only return global variables (default: False) - - Returns: - dict: Contains list of variables with metadata and pagination info - """ - params = { - "offset": offset, - "limit": limit - } - - if search: - params["search"] = search - - if global_only: - params["global_only"] = str(global_only).lower() - - response = safe_get(port, "variables", params) - simplified = simplify_response(response) - - # Ensure we maintain pagination metadata - if isinstance(simplified, dict) and "error" not in simplified: - result_size = 0 - if "result" in simplified and isinstance(simplified["result"], list): - result_size = len(simplified["result"]) - simplified.setdefault("size", result_size) - simplified.setdefault("offset", offset) - simplified.setdefault("limit", limit) - - return simplified - - -@mcp.tool() -def list_data_items(port: int = DEFAULT_GHIDRA_PORT, - offset: int = 0, - limit: int = 100, - addr: str = None, - name: str = None, - name_contains: str = None, - type: str = None) -> dict: - """List defined data items with filtering and pagination - - Args: - port: Ghidra instance port (default: 8192) - offset: Pagination offset (default: 0) - limit: Maximum items to return (default: 100) - addr: Filter by address (hexadecimal) - name: Exact name match filter (case-sensitive) - name_contains: Substring name filter (case-insensitive) - type: Filter by data type (e.g. "string", "dword") - - Returns: - dict: { - "result": list of data item objects, - "size": total count, - "offset": current offset, - "limit": current limit - } - """ - params = { - "offset": offset, - "limit": limit - } - if addr: - params["addr"] = addr - if name: - params["name"] = name - if name_contains: - params["name_contains"] = name_contains - if type: - params["type"] = type - - response = safe_get(port, "data", params) - simplified = simplify_response(response) - - # Ensure we maintain pagination metadata - if isinstance(simplified, dict) and "error" not in simplified: - simplified.setdefault("size", len(simplified.get("result", []))) - simplified.setdefault("offset", offset) - simplified.setdefault("limit", limit) - - return simplified - - -@mcp.tool() -def read_memory(port: int = DEFAULT_GHIDRA_PORT, - address: str = None, - length: int = 16, - format: str = "hex") -> dict: - """Read bytes from memory - - Args: - port: Ghidra instance port (default: 8192) - address: Memory address in hex format - length: Number of bytes to read (default: 16) - format: Output format - "hex", "base64", or "string" (default: "hex") - - Returns: - dict: { - "address": original address, - "length": bytes read, - "format": output format, - "hexBytes": the memory contents as hex string, - "rawBytes": the memory contents as base64 string, - "timestamp": response timestamp - } - """ - if not address: - return { - "success": False, - "error": { - "code": "MISSING_PARAMETER", - "message": "Address parameter is required" - }, - "timestamp": int(time.time() * 1000) - } - - # Use query parameters instead of path parameters for more reliable handling - params = { - "address": address, - "length": length, - "format": format - } - - response = safe_get(port, "memory", params) - simplified = simplify_response(response) - - # Ensure the result is simple and directly usable - if "result" in simplified and isinstance(simplified["result"], dict): - result = simplified["result"] - - # Pass through all representations of the bytes - memory_info = { - "success": True, - "address": result.get("address", address), - "length": result.get("bytesRead", length), - "format": format, - "timestamp": simplified.get("timestamp", int(time.time() * 1000)) - } - - # Include all the different byte representations - if "hexBytes" in result: - memory_info["hexBytes"] = result["hexBytes"] - if "rawBytes" in result: - memory_info["rawBytes"] = result["rawBytes"] - - return memory_info - - return simplified - - -@mcp.tool() -def write_memory(port: int = DEFAULT_GHIDRA_PORT, - address: str = None, - bytes_data: str = None, - format: str = "hex") -> dict: - """Write bytes to memory (use with caution) - - Args: - port: Ghidra instance port (default: 8192) - address: Memory address in hex format - bytes_data: Data to write (format depends on 'format' parameter) - format: Input format - "hex", "base64", or "string" (default: "hex") - - Returns: - dict: Operation result with success status containing: - - address: the target memory address - - length: number of bytes written - - bytesWritten: confirmation of bytes written - """ - if not address: - return { - "success": False, - "error": { - "code": "MISSING_PARAMETER", - "message": "Address parameter is required" - }, - "timestamp": int(time.time() * 1000) - } - - if not bytes_data: - return { - "success": False, - "error": { - "code": "MISSING_PARAMETER", - "message": "Bytes parameter is required" - }, - "timestamp": int(time.time() * 1000) - } - - payload = { - "bytes": bytes_data, - "format": format - } - - response = safe_patch(port, f"memory/{address}", payload) - return simplify_response(response) - - -@mcp.tool() -def list_xrefs(port: int = DEFAULT_GHIDRA_PORT, - to_addr: str = None, - from_addr: str = None, - type: str = None, - offset: int = 0, - limit: int = 100) -> dict: - """List cross-references with filtering and pagination - - Args: - port: Ghidra instance port (default: 8192) - to_addr: Filter references to this address (hexadecimal) - from_addr: Filter references from this address (hexadecimal) - type: Filter by reference type (e.g. "CALL", "READ", "WRITE") - offset: Pagination offset (default: 0) - limit: Maximum items to return (default: 100) - - Returns: - dict: { - "result": list of xref objects with from_addr, to_addr, type, from_function, to_function fields, - "size": total number of xrefs matching the filter, - "offset": current offset for pagination, - "limit": current limit for pagination, - "xrefs": simplified array of cross-references for AI consumption - } - """ - # At least one of the address parameters must be provided - if not to_addr and not from_addr: - return { - "success": False, - "error": { - "code": "MISSING_PARAMETER", - "message": "Either to_addr or from_addr parameter is required" - }, - "timestamp": int(time.time() * 1000) - } - - params = { - "offset": offset, - "limit": limit - } - if to_addr: - params["to_addr"] = to_addr - if from_addr: - params["from_addr"] = from_addr - if type: - params["type"] = type - - response = safe_get(port, "xrefs", params) - simplified = simplify_response(response) - - # Ensure we maintain pagination metadata - if isinstance(simplified, dict) and "error" not in simplified: - simplified.setdefault("size", len(simplified.get("result", []))) - simplified.setdefault("offset", offset) - simplified.setdefault("limit", limit) - - # Create a simplified, flattened view of references for AI consumption - if "result" in simplified and isinstance(simplified["result"], dict) and "references" in simplified["result"]: - references = simplified["result"]["references"] - flat_refs = [] - - for ref in references: - flat_ref = { - "from_addr": ref.get("from_addr"), - "to_addr": ref.get("to_addr"), - "type": ref.get("refType") - } - - # Add source function info if available - if "from_function" in ref and isinstance(ref["from_function"], dict): - flat_ref["from_function"] = ref["from_function"].get("name") - flat_ref["from_function_addr"] = ref["from_function"].get("address") - - # Add navigational URLs for HATEOAS - if ref["from_function"].get("address"): - flat_ref["from_function_decompile_url"] = f"functions/{ref['from_function'].get('address')}/decompile" - flat_ref["from_function_disassembly_url"] = f"functions/{ref['from_function'].get('address')}/disassembly" - - # Add target function info if available - if "to_function" in ref and isinstance(ref["to_function"], dict): - flat_ref["to_function"] = ref["to_function"].get("name") - flat_ref["to_function_addr"] = ref["to_function"].get("address") - - # Add navigational URLs for HATEOAS - if ref["to_function"].get("address"): - flat_ref["to_function_decompile_url"] = f"functions/{ref['to_function'].get('address')}/decompile" - flat_ref["to_function_disassembly_url"] = f"functions/{ref['to_function'].get('address')}/disassembly" - - # Add symbol info if available - if "from_symbol" in ref: - flat_ref["from_symbol"] = ref["from_symbol"] - if "to_symbol" in ref: - flat_ref["to_symbol"] = ref["to_symbol"] - - # Add instruction text if available - if "from_instruction" in ref: - flat_ref["from_instruction"] = ref["from_instruction"] - if "to_instruction" in ref: - flat_ref["to_instruction"] = ref["to_instruction"] - - # Add other useful HATEOAS links - if flat_ref.get("from_addr"): - flat_ref["from_memory_url"] = f"memory/{flat_ref['from_addr']}" - if flat_ref.get("to_addr"): - flat_ref["to_memory_url"] = f"memory/{flat_ref['to_addr']}" - - flat_refs.append(flat_ref) - - # Add the simplified references - simplified["xrefs"] = flat_refs - - # Create a text representation for easier consumption - text_refs = [] - for ref in flat_refs: - from_func = f"[{ref.get('from_function', '??')}]" if "from_function" in ref else "" - to_func = f"[{ref.get('to_function', '??')}]" if "to_function" in ref else "" - - line = f"{ref.get('from_addr')} {from_func} -> {ref.get('to_addr')} {to_func} ({ref.get('type', '??')})" - text_refs.append(line) - - simplified["xrefs_text"] = "\n".join(text_refs) - - # Add navigation links for next/previous pages - if offset > 0: - prev_offset = max(0, offset - limit) - simplified["prev_page_url"] = f"xrefs?offset={prev_offset}&limit={limit}" - if to_addr: - simplified["prev_page_url"] += f"&to_addr={to_addr}" - if from_addr: - simplified["prev_page_url"] += f"&from_addr={from_addr}" - if type: - simplified["prev_page_url"] += f"&type={type}" - - total_size = simplified.get("size", 0) - if offset + limit < total_size: - next_offset = offset + limit - simplified["next_page_url"] = f"xrefs?offset={next_offset}&limit={limit}" - if to_addr: - simplified["next_page_url"] += f"&to_addr={to_addr}" - if from_addr: - simplified["next_page_url"] += f"&from_addr={from_addr}" - if type: - simplified["next_page_url"] += f"&type={type}" - - return simplified - - -@mcp.tool() -def get_current_address(port: int = DEFAULT_GHIDRA_PORT) -> dict: - """Get the address currently selected in Ghidra's UI - - Args: - port: Ghidra instance port (default: 8192) - - Returns: - Dict containing: - - success: boolean indicating success - - result: object with address field - - error: error message if failed - - timestamp: timestamp of response - """ - response = safe_get(port, "address") - return simplify_response(response) - - -@mcp.tool() -def get_current_function(port: int = DEFAULT_GHIDRA_PORT) -> dict: - """Get the function currently selected in Ghidra's UI - - Args: - port: Ghidra instance port (default: 8192) - - Returns: - Dict containing: - - success: boolean indicating success - - result: object with name, address and signature fields - - error: error message if failed - - timestamp: timestamp of response - """ - response = safe_get(port, "function") - return simplify_response(response) - - -@mcp.tool() -def analyze_program(port: int = DEFAULT_GHIDRA_PORT, - analysis_options: dict = None) -> dict: - """Run analysis on the current program - - Args: - port: Ghidra instance port (default: 8192) - analysis_options: Dictionary of analysis options to enable/disable - (e.g. {"functionRecovery": True, "dataRefs": False}) - None means use default analysis options - - Returns: - dict: Analysis operation result with status containing: - - program: program name - - analysis_triggered: boolean indicating if analysis was successfully started - - message: status message - """ - response = safe_post(port, "analysis", analysis_options or {}) - return simplify_response(response) - - -@mcp.tool() -def create_function(port: int = DEFAULT_GHIDRA_PORT, - address: str = "") -> dict: - """Create a new function at the specified address - - Args: - port: Ghidra instance port (default: 8192) - address: Memory address in hex format where function starts - - Returns: - dict: Operation result with the created function information - """ - if not address: - return { - "success": False, - "error": "Address parameter is required", - "timestamp": int(time.time() * 1000) - } - - payload = { - "address": address - } - - response = safe_post(port, "functions", payload) - return simplify_response(response) - - -@mcp.tool() -def rename_function(port: int = DEFAULT_GHIDRA_PORT, - address: str = None, - name: str = None, - new_name: str = "") -> dict: - """Rename a function - - Args: - port: Ghidra instance port (default: 8192) - address: Function address in hex format (mutually exclusive with name) - name: Current function name (mutually exclusive with address) - new_name: New function name - - Returns: - dict: Operation result with the updated function information - """ - if not (address or name) or not new_name: - return { - "success": False, - "error": { - "code": "MISSING_PARAMETER", - "message": "Either address or name, and new_name parameters are required" - }, - "timestamp": int(time.time() * 1000) - } - - payload = { - "name": new_name - } - - if address: - endpoint = f"functions/{address}" - else: - endpoint = f"functions/by-name/{quote(name)}" - - response = safe_patch(port, endpoint, payload) - return simplify_response(response) - - -@mcp.tool() -def set_function_signature(port: int = DEFAULT_GHIDRA_PORT, - address: str = None, - name: str = None, - signature: str = "") -> dict: - """Set function signature/prototype - - Args: - port: Ghidra instance port (default: 8192) - address: Function address in hex format (mutually exclusive with name) - name: Function name (mutually exclusive with address) - signature: New function signature (e.g., "int func(char *data, int size)") - - Returns: - dict: Operation result with the updated function information - """ - if not (address or name) or not signature: - return { - "success": False, - "error": "Either address or name, and signature parameters are required", - "timestamp": int(time.time() * 1000) - } - - payload = { - "signature": signature - } - - if address: - endpoint = f"functions/{address}" - else: - endpoint = f"functions/by-name/{quote(name)}" - - response = safe_patch(port, endpoint, payload) - return simplify_response(response) - - -@mcp.tool() -def rename_variable(port: int = DEFAULT_GHIDRA_PORT, - function_address: str = None, - function_name: str = None, - variable_name: str = "", - new_name: str = "") -> dict: - """Rename a variable in a function - - Args: - port: Ghidra instance port (default: 8192) - function_address: Function address in hex format (mutually exclusive with function_name) - function_name: Function name (mutually exclusive with function_address) - variable_name: Current variable name - new_name: New variable name - - Returns: - dict: Operation result with the updated variable information - """ - if not (function_address or function_name) or not variable_name or not new_name: - return { - "success": False, - "error": "Function identifier (address or name), variable_name, and new_name parameters are required", - "timestamp": int(time.time() * 1000) - } - - payload = { - "name": new_name - } - - if function_address: - endpoint = f"functions/{function_address}/variables/{variable_name}" - else: - endpoint = f"functions/by-name/{quote(function_name)}/variables/{variable_name}" - - response = safe_patch(port, endpoint, payload) - return simplify_response(response) - - -@mcp.tool() -def set_variable_type(port: int = DEFAULT_GHIDRA_PORT, - function_address: str = None, - function_name: str = None, - variable_name: str = "", - data_type: str = "") -> dict: - """Change the data type of a variable in a function - - Args: - port: Ghidra instance port (default: 8192) - function_address: Function address in hex format (mutually exclusive with function_name) - function_name: Function name (mutually exclusive with function_address) - variable_name: Variable name - data_type: New data type (e.g. "int", "char *") - - Returns: - dict: Operation result with the updated variable information - """ - if not (function_address or function_name) or not variable_name or not data_type: - return { - "success": False, - "error": "Function identifier (address or name), variable_name, and data_type parameters are required", - "timestamp": int(time.time() * 1000) - } - - payload = { - "data_type": data_type - } - - if function_address: - endpoint = f"functions/{function_address}/variables/{variable_name}" - else: - endpoint = f"functions/by-name/{quote(function_name)}/variables/{variable_name}" - - response = safe_patch(port, endpoint, payload) - return simplify_response(response) - - -@mcp.tool() -def create_data(port: int = DEFAULT_GHIDRA_PORT, - address: str = "", - data_type: str = "", - size: int = None) -> dict: - """Define a new data item at the specified address - - Args: - port: Ghidra instance port (default: 8192) - address: Memory address in hex format - data_type: Data type (e.g. "string", "dword", "byte") - size: Optional size in bytes for the data item - - Returns: - dict: Operation result with the created data information - """ - if not address or not data_type: - return { - "success": False, - "error": "Address and data_type parameters are required", - "timestamp": int(time.time() * 1000) - } - - payload = { - "address": address, - "type": data_type - } - - if size is not None: - payload["size"] = size - - response = safe_post(port, "data", payload) - return simplify_response(response) - - -@mcp.tool() -def delete_data(port: int = DEFAULT_GHIDRA_PORT, - address: str = "") -> dict: - """Delete data at the specified address - - Args: - port: Ghidra instance port (default: 8192) - address: Memory address in hex format - - Returns: - dict: Operation result - """ - if not address: - return { - "success": False, - "error": "Address parameter is required", - "timestamp": int(time.time() * 1000) - } - - payload = { - "address": address, - "action": "delete" - } - - response = safe_post(port, "data/delete", payload) - return simplify_response(response) - - -@mcp.tool() -def rename_data(port: int = DEFAULT_GHIDRA_PORT, - address: str = "", - name: str = "") -> dict: - """Rename a data item - - Args: - port: Ghidra instance port (default: 8192) - address: Memory address in hex format - name: New name for the data item - - Returns: - dict: Operation result with the updated data information - """ - if not address or not name: - return { - "success": False, - "error": { - "code": "MISSING_PARAMETER", - "message": "Address and name parameters are required" - }, - "timestamp": int(time.time() * 1000) - } - - payload = { - "address": address, - "newName": name - } - - response = safe_post(port, "data", payload) - return simplify_response(response) - - -@mcp.tool() -def list_strings(port: int = DEFAULT_GHIDRA_PORT, - offset: int = 0, - limit: int = 2000, - filter: str = None) -> dict: - """List all defined strings in the binary with their memory addresses - - Args: - port: Ghidra instance port (default: 8192) - offset: Pagination offset (default: 0) - limit: Maximum strings to return (default: 2000) - filter: Optional string content filter - - Returns: - dict: List of string data with addresses, values, and metadata - """ - params = { - "offset": offset, - "limit": limit - } - - if filter: - params["filter"] = filter - - response = safe_get(port, "strings", params) - return simplify_response(response) - - -@mcp.tool() -def update_data(port: int = DEFAULT_GHIDRA_PORT, - address: str = "", - name: str = None, - data_type: str = None) -> dict: - """Update a data item's name and/or type - - Args: - port: Ghidra instance port (default: 8192) - address: Memory address in hex format - name: New name for the data item - data_type: New data type (e.g. "uint32_t *", "char[10]", "struct point") - - Returns: - dict: Operation result with the updated data information - """ - if not address or (name is None and data_type is None): - return { - "success": False, - "error": { - "code": "MISSING_PARAMETER", - "message": "Address parameter and at least one of name or data_type are required" - }, - "timestamp": int(time.time() * 1000) - } - - payload = { - "address": address - } - - if name: - payload["newName"] = name - - if data_type: - payload["type"] = data_type - - # Handle different cases for maximum reliability - if name and data_type is None: - # If only renaming, use the main data endpoint - response = safe_post(port, "data", payload) - return simplify_response(response) - - if data_type and name is None: - # If only changing type, use the data/type endpoint - response = safe_post(port, "data/type", payload) - return simplify_response(response) - - if name and data_type: - # If both name and type, use the data/update endpoint - response = safe_post(port, "data/update", payload) - return simplify_response(response) - - # This shouldn't be reached due to earlier checks - return { - "success": False, - "error": { - "code": "INVALID_REQUEST", - "message": "Neither name nor data_type specified" - }, - "timestamp": int(time.time() * 1000) - } - - -@mcp.tool() -def set_data_type(port: int = DEFAULT_GHIDRA_PORT, - address: str = "", - data_type: str = "") -> dict: - """Set the data type of a data item - - Args: - port: Ghidra instance port (default: 8192) - address: Memory address in hex format - data_type: Data type name (e.g. "uint32_t", "char[10]") - - Returns: - dict: Operation result with the updated data information - """ - if not address or not data_type: - return { - "success": False, - "error": { - "code": "MISSING_PARAMETER", - "message": "Address and data_type parameters are required" - }, - "timestamp": int(time.time() * 1000) - } - - # We need to first get the current name of the data - try: - # Just use a fixed name based on address for now - current_name = f"DATA_{address}" - - # We're intentionally simplifying by not trying to preserve the current name - # This avoids potential API inconsistencies but means the name might change - - # Prepare the payload with both type and the current name - payload = { - "address": address, - "type": data_type, - "newName": current_name # Preserve the current name - } - - # This uses the POST endpoint to update both type and preserve name - response = safe_post(port, "data", payload) - return simplify_response(response) - except Exception as e: - return { - "success": False, - "error": { - "code": "DATA_TYPE_ERROR", - "message": f"Failed to set data type: {str(e)}" - }, - "timestamp": int(time.time() * 1000) - } - - -@mcp.tool() -def list_namespaces(port: int = DEFAULT_GHIDRA_PORT, - offset: int = 0, - limit: int = 100) -> dict: - """List namespaces with pagination - - Args: - port: Ghidra instance port (default: 8192) - offset: Pagination offset (default: 0) - limit: Maximum items to return (default: 100) - - Returns: - dict: Contains list of namespaces with pagination information - """ - params = { - "offset": offset, - "limit": limit - } - - response = safe_get(port, "namespaces", params) - simplified = simplify_response(response) - - # Ensure we maintain pagination metadata - if isinstance(simplified, dict) and "error" not in simplified: - simplified.setdefault("size", len(simplified.get("result", []))) - simplified.setdefault("offset", offset) - simplified.setdefault("limit", limit) - - return simplified - - -@mcp.tool() -def get_callgraph(port: int = DEFAULT_GHIDRA_PORT, - function: str = None, - max_depth: int = 3) -> dict: - """Get function call graph visualization data - - Args: - port: Ghidra instance port (default: 8192) - function: Starting function name or address (None starts from entry point) - max_depth: Maximum call depth to analyze (default: 3) - - Returns: - dict: Graph data with: - - root: name of the starting function - - root_address: address of the starting function - - max_depth: depth limit used for graph generation - - nodes: list of function nodes in the graph (with id, name, address) - - edges: list of call relationships between functions - """ - params = {"max_depth": max_depth} - if function: - params["function"] = function - - response = safe_get(port, "analysis/callgraph", params) - return simplify_response(response) - - -@mcp.tool() -def get_dataflow(port: int = DEFAULT_GHIDRA_PORT, - address: str = "", - direction: str = "forward", - max_steps: int = 50) -> dict: - """Perform data flow analysis from an address - - Args: - port: Ghidra instance port (default: 8192) - address: Starting address in hex format - direction: "forward" or "backward" (default: "forward") - max_steps: Maximum analysis steps (default: 50) - - Returns: - dict: Data flow analysis results - """ - if not address: - return { - "success": False, - "error": "Address parameter is required", - "timestamp": int(time.time() * 1000) - } - - params = { - "address": address, - "direction": direction, - "max_steps": max_steps - } - - response = safe_get(port, "analysis/dataflow", params) - return simplify_response(response) - - -@mcp.tool() -def set_comment(port: int = DEFAULT_GHIDRA_PORT, - address: str = None, - comment: str = "", - comment_type: str = "plate") -> dict: - """Set a comment at the specified address - - Args: - port: Ghidra instance port (default: 8192) - address: Memory address in hex format - comment: Comment text - comment_type: Type of comment - - "plate" (disassembly), - "pre" (pre-function), - "post" (post-function), - "eol" (end of line), - "repeatable" (shows each time referenced) - (default: "plate") - - Returns: - dict: Operation result - """ - if not address: - return { - "success": False, - "error": { - "code": "MISSING_PARAMETER", - "message": "Address parameter is required" - }, - "timestamp": int(time.time() * 1000) - } - - payload = { - "comment": comment - } - - response = safe_post(port, f"memory/{address}/comments/{comment_type}", payload) - return simplify_response(response) - - -@mcp.tool() -def set_decompiler_comment(port: int = DEFAULT_GHIDRA_PORT, - address: str = None, - comment: str = "") -> dict: - """Set a decompiler comment at the specified address - - Args: - port: Ghidra instance port (default: 8192) - address: Memory address in hex format - comment: Comment text - - Returns: - dict: Operation result - """ - if not address: - return { - "success": False, - "error": { - "code": "MISSING_PARAMETER", - "message": "Address parameter is required" - }, - "timestamp": int(time.time() * 1000) - } - - # In Ghidra, function comments need to be set on the function itself, not as plate comments - # Let's first try to get the function at this address - try: - func_response = safe_get(port, f"functions/{address}") - if func_response.get("success", True): - # We have a function, let's use the function comment endpoint - payload = { - "comment": comment - } - - # Use the function update endpoint with just the comment field - response = safe_patch(port, f"functions/{address}", payload) - return simplify_response(response) - except Exception as e: - logger.error(f"Error setting function comment: {e}") - # Fall back to plate comment if function-specific approach fails - pass - - # If we couldn't set a function comment, fall back to plate comment as before - return set_comment(port, address, comment, "pre") - - -def handle_sigint(signum, frame): - os._exit(0) - - def periodic_discovery(): """Periodically discover new instances""" while True: @@ -2002,13 +573,1102 @@ def periodic_discovery(): time.sleep(30) +def handle_sigint(signum, frame): + os._exit(0) + +# ================= MCP Resources ================= +# Resources provide information that can be loaded directly into context +# They focus on data and minimize metadata + +@mcp.resource() +def ghidra_instance(port: int = None) -> dict: + """Get detailed information about a Ghidra instance and the loaded program + + Args: + port: Specific Ghidra instance port (optional, uses current if omitted) + + Returns: + dict: Detailed information about the Ghidra instance and loaded program + """ + port = _get_instance_port(port) + response = safe_get(port, "program") + + if not isinstance(response, dict) or not response.get("success", False): + return {"error": f"Unable to access Ghidra instance on port {port}"} + + # Extract only the most relevant information for the resource + result = response.get("result", {}) + + if not isinstance(result, dict): + return {"error": "Invalid response format from Ghidra instance"} + + instance_info = { + "port": port, + "url": get_instance_url(port), + "program_name": result.get("name", "unknown"), + "program_id": result.get("programId", "unknown"), + "language": result.get("languageId", "unknown"), + "compiler": result.get("compilerSpecId", "unknown"), + "base_address": result.get("imageBase", "0x0"), + "memory_size": result.get("memorySize", 0), + "analysis_complete": result.get("analysisComplete", False) + } + + # Add project information if available + if "project" in active_instances[port]: + instance_info["project"] = active_instances[port]["project"] + + return instance_info + +@mcp.resource() +def decompiled_function(name: str = None, address: str = None, port: int = None) -> str: + """Get decompiled C code for a function + + Args: + name: Function name (mutually exclusive with address) + address: Function address in hex format (mutually exclusive with address) + port: Specific Ghidra instance port (optional) + + Returns: + str: The decompiled C code as a string, or error message + """ + if not name and not address: + return "Error: Either name or address parameter is required" + + port = _get_instance_port(port) + + params = { + "syntax_tree": "false", + "style": "normalize" + } + + if address: + endpoint = f"functions/{address}/decompile" + else: + endpoint = f"functions/by-name/{quote(name)}/decompile" + + response = safe_get(port, endpoint, params) + simplified = simplify_response(response) + + # For a resource, we want to directly return just the decompiled code + if (not isinstance(simplified, dict) or + not simplified.get("success", False) or + "result" not in simplified): + error_message = "Error: Could not decompile function" + if isinstance(simplified, dict) and "error" in simplified: + if isinstance(simplified["error"], dict): + error_message = simplified["error"].get("message", error_message) + else: + error_message = str(simplified["error"]) + return error_message + + # Extract just the decompiled code text + result = simplified["result"] + + # Different endpoints may return the code in different fields, try all of them + if isinstance(result, dict): + for key in ["decompiled_text", "ccode", "decompiled"]: + if key in result: + return result[key] + + return "Error: Could not extract decompiled code from response" + +@mcp.resource() +def function_info(name: str = None, address: str = None, port: int = None) -> dict: + """Get detailed information about a function + + Args: + name: Function name (mutually exclusive with address) + address: Function address in hex format (mutually exclusive with address) + port: Specific Ghidra instance port (optional) + + Returns: + dict: Complete function information including signature, parameters, etc. + """ + if not name and not address: + return {"error": "Either name or address parameter is required"} + + port = _get_instance_port(port) + + if address: + endpoint = f"functions/{address}" + else: + endpoint = f"functions/by-name/{quote(name)}" + + response = safe_get(port, endpoint) + simplified = simplify_response(response) + + if (not isinstance(simplified, dict) or + not simplified.get("success", False) or + "result" not in simplified): + error = {"error": "Could not get function information"} + if isinstance(simplified, dict) and "error" in simplified: + error["error_details"] = simplified["error"] + return error + + # Return just the function data without API metadata + return simplified["result"] + +@mcp.resource() +def disassembly(name: str = None, address: str = None, port: int = None) -> str: + """Get disassembled instructions for a function + + Args: + name: Function name (mutually exclusive with address) + address: Function address in hex format (mutually exclusive with address) + port: Specific Ghidra instance port (optional) + + Returns: + str: Formatted disassembly listing as a string + """ + if not name and not address: + return "Error: Either name or address parameter is required" + + port = _get_instance_port(port) + + if address: + endpoint = f"functions/{address}/disassembly" + else: + endpoint = f"functions/by-name/{quote(name)}/disassembly" + + response = safe_get(port, endpoint) + simplified = simplify_response(response) + + if (not isinstance(simplified, dict) or + not simplified.get("success", False) or + "result" not in simplified): + error_message = "Error: Could not get disassembly" + if isinstance(simplified, dict) and "error" in simplified: + if isinstance(simplified["error"], dict): + error_message = simplified["error"].get("message", error_message) + else: + error_message = str(simplified["error"]) + return error_message + + # For a resource, we want to directly return just the disassembly text + result = simplified["result"] + + # Check if we have a disassembly_text field already + if isinstance(result, dict) and "disassembly_text" in result: + return result["disassembly_text"] + + # Otherwise if we have raw instructions, format them ourselves + if isinstance(result, dict) and "instructions" in result and isinstance(result["instructions"], list): + disasm_text = "" + for instr in result["instructions"]: + if isinstance(instr, dict): + addr = instr.get("address", "") + mnemonic = instr.get("mnemonic", "") + operands = instr.get("operands", "") + bytes_str = instr.get("bytes", "") + + # Format: address: bytes mnemonic operands + disasm_text += f"{addr}: {bytes_str.ljust(10)} {mnemonic} {operands}\n" + + return disasm_text + + # If we have a direct disassembly field, try that as well + if isinstance(result, dict) and "disassembly" in result: + return result["disassembly"] + + return "Error: Could not extract disassembly from response" + +# ================= MCP Prompts ================= +# Prompts define reusable templates for LLM interactions + +@mcp.prompt("analyze_function") +def analyze_function_prompt(name: str = None, address: str = None, port: int = None): + """A prompt to guide the LLM through analyzing a function + + Args: + name: Function name (mutually exclusive with address) + address: Function address in hex format (mutually exclusive with address) + port: Specific Ghidra instance port (optional) + """ + port = _get_instance_port(port) + + # Get function name if only address is provided + if address and not name: + fn_info = function_info(address=address, port=port) + if isinstance(fn_info, dict) and "name" in fn_info: + name = fn_info["name"] + + # Create the template that guides analysis + return { + "prompt": f""" + Analyze the following function: {name or address} + + Decompiled code: + ```c + {decompiled_function(name=name, address=address, port=port)} + ``` + + Disassembly: + ``` + {disassembly(name=name, address=address, port=port)} + ``` + + 1. What is the purpose of this function? + 2. What are the key parameters and their uses? + 3. What are the return values and their meanings? + 4. Are there any security concerns in this implementation? + 5. Describe the algorithm or process being implemented. + """, + "context": { + "function_info": function_info(name=name, address=address, port=port) + } + } + +@mcp.prompt("identify_vulnerabilities") +def identify_vulnerabilities_prompt(name: str = None, address: str = None, port: int = None): + """A prompt to help identify potential vulnerabilities in a function + + Args: + name: Function name (mutually exclusive with address) + address: Function address in hex format (mutually exclusive with address) + port: Specific Ghidra instance port (optional) + """ + port = _get_instance_port(port) + + # Get function name if only address is provided + if address and not name: + fn_info = function_info(address=address, port=port) + if isinstance(fn_info, dict) and "name" in fn_info: + name = fn_info["name"] + + # Create the template focused on security analysis + return { + "prompt": f""" + Analyze the following function for security vulnerabilities: {name or address} + + Decompiled code: + ```c + {decompiled_function(name=name, address=address, port=port)} + ``` + + Look for these vulnerability types: + 1. Buffer overflows or underflows + 2. Integer overflow/underflow + 3. Use-after-free or double-free bugs + 4. Format string vulnerabilities + 5. Missing bounds checks + 6. Insecure memory operations + 7. Race conditions or timing issues + 8. Input validation problems + + For each potential vulnerability: + - Describe the vulnerability and where it occurs + - Explain the security impact + - Suggest how it could be exploited + - Recommend a fix + """, + "context": { + "function_info": function_info(name=name, address=address, port=port), + "disassembly": disassembly(name=name, address=address, port=port) + } + } + +# ================= MCP Tools ================= +# Since we can't use tool groups, we'll use namespaces in the function names + +# Instance management tools +@mcp.tool() +def instances_list() -> dict: + """List all active Ghidra instances""" + with instances_lock: + return { + "instances": [ + { + "port": port, + "url": info["url"], + "project": info.get("project", ""), + "file": info.get("file", "") + } + for port, info in active_instances.items() + ] + } + +@mcp.tool() +def instances_discover(host: str = None) -> dict: + """Discover available Ghidra instances by scanning ports + + Args: + host: Optional host to scan (default: configured ghidra_host) + + Returns: + dict: Contains 'found' count and 'instances' list with discovery results + """ + return _discover_instances(QUICK_DISCOVERY_RANGE, host=host, timeout=0.5) + +@mcp.tool() +def instances_register(port: int, url: str = None) -> str: + """Register a new Ghidra instance + + Args: + port: Port number of the Ghidra instance + url: Optional URL if different from default http://host:port + + Returns: + str: Confirmation message or error + """ + return register_instance(port, url) + +@mcp.tool() +def instances_unregister(port: int) -> str: + """Unregister a Ghidra instance + + Args: + port: Port number of the instance to unregister + + Returns: + str: Confirmation message or error + """ + with instances_lock: + if port in active_instances: + del active_instances[port] + return f"Unregistered instance on port {port}" + return f"No instance found on port {port}" + +@mcp.tool() +def instances_use(port: int) -> str: + """Set the current working Ghidra instance + + Args: + port: Port number of the instance to use + + Returns: + str: Confirmation message or error + """ + global current_instance_port + + # First validate that the instance exists and is active + if port not in active_instances: + # Try to register it if not found + register_instance(port) + if port not in active_instances: + return f"Error: No active Ghidra instance found on port {port}" + + # Set as current instance + current_instance_port = port + + # Return information about the selected instance + with instances_lock: + info = active_instances[port] + program = info.get("file", "unknown program") + project = info.get("project", "unknown project") + return f"Now using Ghidra instance on port {port} with {program} in project {project}" + +@mcp.tool() +def instances_current() -> dict: + """Get information about the current working Ghidra instance + + Returns: + dict: Details about the current instance and program + """ + return ghidra_instance(port=current_instance_port) + +# Function tools +@mcp.tool() +def functions_list(offset: int = 0, limit: int = 100, + name_contains: str = None, + name_matches_regex: str = None, + port: int = None) -> dict: + """List functions with filtering and pagination + + Args: + offset: Pagination offset (default: 0) + limit: Maximum items to return (default: 100) + name_contains: Substring name filter (case-insensitive) + name_matches_regex: Regex name filter + port: Specific Ghidra instance port (optional) + + Returns: + dict: List of functions with pagination information + """ + port = _get_instance_port(port) + + params = { + "offset": offset, + "limit": limit + } + if name_contains: + params["name_contains"] = name_contains + if name_matches_regex: + params["name_matches_regex"] = name_matches_regex + + response = safe_get(port, "functions", params) + simplified = simplify_response(response) + + # Ensure we maintain pagination metadata + if isinstance(simplified, dict) and "error" not in simplified: + simplified.setdefault("size", len(simplified.get("result", []))) + simplified.setdefault("offset", offset) + simplified.setdefault("limit", limit) + + return simplified + +@mcp.tool() +def functions_get(name: str = None, address: str = None, port: int = None) -> dict: + """Get detailed information about a function + + Args: + name: Function name (mutually exclusive with address) + address: Function address in hex format (mutually exclusive with name) + port: Specific Ghidra instance port (optional) + + Returns: + dict: Detailed function information + """ + if not name and not address: + return { + "success": False, + "error": { + "code": "MISSING_PARAMETER", + "message": "Either name or address parameter is required" + }, + "timestamp": int(time.time() * 1000) + } + + port = _get_instance_port(port) + + if address: + endpoint = f"functions/{address}" + else: + endpoint = f"functions/by-name/{quote(name)}" + + response = safe_get(port, endpoint) + return simplify_response(response) + +@mcp.tool() +def functions_decompile(name: str = None, address: str = None, + syntax_tree: bool = False, style: str = "normalize", + port: int = None) -> dict: + """Get decompiled code for a function + + Args: + name: Function name (mutually exclusive with address) + address: Function address in hex format (mutually exclusive with name) + syntax_tree: Include syntax tree (default: False) + style: Decompiler style (default: "normalize") + port: Specific Ghidra instance port (optional) + + Returns: + dict: Contains function information and decompiled code + """ + if not name and not address: + return { + "success": False, + "error": { + "code": "MISSING_PARAMETER", + "message": "Either name or address parameter is required" + }, + "timestamp": int(time.time() * 1000) + } + + port = _get_instance_port(port) + + params = { + "syntax_tree": str(syntax_tree).lower(), + "style": style + } + + if address: + endpoint = f"functions/{address}/decompile" + else: + endpoint = f"functions/by-name/{quote(name)}/decompile" + + response = safe_get(port, endpoint, params) + simplified = simplify_response(response) + + # For AI consumption, make the decompiled code more directly accessible + if "result" in simplified and isinstance(simplified["result"], dict): + if "decompiled" in simplified["result"]: + simplified["decompiled_code"] = simplified["result"]["decompiled"] + elif "ccode" in simplified["result"]: + simplified["decompiled_code"] = simplified["result"]["ccode"] + elif "decompiled_text" in simplified["result"]: + simplified["decompiled_code"] = simplified["result"]["decompiled_text"] + + return simplified + +@mcp.tool() +def functions_disassemble(name: str = None, address: str = None, port: int = None) -> dict: + """Get disassembly for a function + + Args: + name: Function name (mutually exclusive with address) + address: Function address in hex format (mutually exclusive with name) + port: Specific Ghidra instance port (optional) + + Returns: + dict: Contains function information and disassembly text + """ + if not name and not address: + return { + "success": False, + "error": { + "code": "MISSING_PARAMETER", + "message": "Either name or address parameter is required" + }, + "timestamp": int(time.time() * 1000) + } + + port = _get_instance_port(port) + + if address: + endpoint = f"functions/{address}/disassembly" + else: + endpoint = f"functions/by-name/{quote(name)}/disassembly" + + response = safe_get(port, endpoint) + return simplify_response(response) + +@mcp.tool() +def functions_create(address: str, port: int = None) -> dict: + """Create a new function at the specified address + + Args: + address: Memory address in hex format where function starts + port: Specific Ghidra instance port (optional) + + Returns: + dict: Operation result with the created function information + """ + if not address: + return { + "success": False, + "error": { + "code": "MISSING_PARAMETER", + "message": "Address parameter is required" + }, + "timestamp": int(time.time() * 1000) + } + + port = _get_instance_port(port) + + payload = { + "address": address + } + + response = safe_post(port, "functions", payload) + return simplify_response(response) + +@mcp.tool() +def functions_rename(old_name: str = None, address: str = None, new_name: str = "", port: int = None) -> dict: + """Rename a function + + Args: + old_name: Current function name (mutually exclusive with address) + address: Function address in hex format (mutually exclusive with name) + new_name: New function name + port: Specific Ghidra instance port (optional) + + Returns: + dict: Operation result with the updated function information + """ + if not (old_name or address) or not new_name: + return { + "success": False, + "error": { + "code": "MISSING_PARAMETER", + "message": "Either old_name or address, and new_name parameters are required" + }, + "timestamp": int(time.time() * 1000) + } + + port = _get_instance_port(port) + + payload = { + "name": new_name + } + + if address: + endpoint = f"functions/{address}" + else: + endpoint = f"functions/by-name/{quote(old_name)}" + + response = safe_patch(port, endpoint, payload) + return simplify_response(response) + +@mcp.tool() +def functions_set_signature(name: str = None, address: str = None, signature: str = "", port: int = None) -> dict: + """Set function signature/prototype + + Args: + name: Function name (mutually exclusive with address) + address: Function address in hex format (mutually exclusive with name) + signature: New function signature (e.g., "int func(char *data, int size)") + port: Specific Ghidra instance port (optional) + + Returns: + dict: Operation result with the updated function information + """ + if not (name or address) or not signature: + return { + "success": False, + "error": { + "code": "MISSING_PARAMETER", + "message": "Either name or address, and signature parameters are required" + }, + "timestamp": int(time.time() * 1000) + } + + port = _get_instance_port(port) + + payload = { + "signature": signature + } + + if address: + endpoint = f"functions/{address}" + else: + endpoint = f"functions/by-name/{quote(name)}" + + response = safe_patch(port, endpoint, payload) + return simplify_response(response) + +@mcp.tool() +def functions_get_variables(name: str = None, address: str = None, port: int = None) -> dict: + """Get variables for a function + + Args: + name: Function name (mutually exclusive with address) + address: Function address in hex format (mutually exclusive with name) + port: Specific Ghidra instance port (optional) + + Returns: + dict: Contains function information and list of variables + """ + if not name and not address: + return { + "success": False, + "error": { + "code": "MISSING_PARAMETER", + "message": "Either name or address parameter is required" + }, + "timestamp": int(time.time() * 1000) + } + + port = _get_instance_port(port) + + if address: + endpoint = f"functions/{address}/variables" + else: + endpoint = f"functions/by-name/{quote(name)}/variables" + + response = safe_get(port, endpoint) + return simplify_response(response) + +# Memory tools +@mcp.tool() +def memory_read(address: str, length: int = 16, format: str = "hex", port: int = None) -> dict: + """Read bytes from memory + + Args: + address: Memory address in hex format + length: Number of bytes to read (default: 16) + format: Output format - "hex", "base64", or "string" (default: "hex") + port: Specific Ghidra instance port (optional) + + Returns: + dict: { + "address": original address, + "length": bytes read, + "format": output format, + "hexBytes": the memory contents as hex string, + "rawBytes": the memory contents as base64 string, + "timestamp": response timestamp + } + """ + if not address: + return { + "success": False, + "error": { + "code": "MISSING_PARAMETER", + "message": "Address parameter is required" + }, + "timestamp": int(time.time() * 1000) + } + + port = _get_instance_port(port) + + # Use query parameters instead of path parameters for more reliable handling + params = { + "address": address, + "length": length, + "format": format + } + + response = safe_get(port, "memory", params) + simplified = simplify_response(response) + + # Ensure the result is simple and directly usable + if "result" in simplified and isinstance(simplified["result"], dict): + result = simplified["result"] + + # Pass through all representations of the bytes + memory_info = { + "success": True, + "address": result.get("address", address), + "length": result.get("bytesRead", length), + "format": format, + "timestamp": simplified.get("timestamp", int(time.time() * 1000)) + } + + # Include all the different byte representations + if "hexBytes" in result: + memory_info["hexBytes"] = result["hexBytes"] + if "rawBytes" in result: + memory_info["rawBytes"] = result["rawBytes"] + + return memory_info + + return simplified + +@mcp.tool() +def memory_write(address: str, bytes_data: str, format: str = "hex", port: int = None) -> dict: + """Write bytes to memory (use with caution) + + Args: + address: Memory address in hex format + bytes_data: Data to write (format depends on 'format' parameter) + format: Input format - "hex", "base64", or "string" (default: "hex") + port: Specific Ghidra instance port (optional) + + Returns: + dict: Operation result with success status + """ + if not address: + return { + "success": False, + "error": { + "code": "MISSING_PARAMETER", + "message": "Address parameter is required" + }, + "timestamp": int(time.time() * 1000) + } + + if not bytes_data: + return { + "success": False, + "error": { + "code": "MISSING_PARAMETER", + "message": "Bytes parameter is required" + }, + "timestamp": int(time.time() * 1000) + } + + port = _get_instance_port(port) + + payload = { + "bytes": bytes_data, + "format": format + } + + response = safe_patch(port, f"memory/{address}", payload) + return simplify_response(response) + +# Xrefs tools +@mcp.tool() +def xrefs_list(to_addr: str = None, from_addr: str = None, type: str = None, + offset: int = 0, limit: int = 100, port: int = None) -> dict: + """List cross-references with filtering and pagination + + Args: + to_addr: Filter references to this address (hexadecimal) + from_addr: Filter references from this address (hexadecimal) + type: Filter by reference type (e.g. "CALL", "READ", "WRITE") + offset: Pagination offset (default: 0) + limit: Maximum items to return (default: 100) + port: Specific Ghidra instance port (optional) + + Returns: + dict: Cross-references matching the filters + """ + # At least one of the address parameters must be provided + if not to_addr and not from_addr: + return { + "success": False, + "error": { + "code": "MISSING_PARAMETER", + "message": "Either to_addr or from_addr parameter is required" + }, + "timestamp": int(time.time() * 1000) + } + + port = _get_instance_port(port) + + params = { + "offset": offset, + "limit": limit + } + if to_addr: + params["to_addr"] = to_addr + if from_addr: + params["from_addr"] = from_addr + if type: + params["type"] = type + + response = safe_get(port, "xrefs", params) + simplified = simplify_response(response) + + # Ensure we maintain pagination metadata + if isinstance(simplified, dict) and "error" not in simplified: + simplified.setdefault("size", len(simplified.get("result", []))) + simplified.setdefault("offset", offset) + simplified.setdefault("limit", limit) + + return simplified + +# Data tools +@mcp.tool() +def data_list(offset: int = 0, limit: int = 100, addr: str = None, + name: str = None, name_contains: str = None, type: str = None, + port: int = None) -> dict: + """List defined data items with filtering and pagination + + Args: + offset: Pagination offset (default: 0) + limit: Maximum items to return (default: 100) + addr: Filter by address (hexadecimal) + name: Exact name match filter (case-sensitive) + name_contains: Substring name filter (case-insensitive) + type: Filter by data type (e.g. "string", "dword") + port: Specific Ghidra instance port (optional) + + Returns: + dict: Data items matching the filters + """ + port = _get_instance_port(port) + + params = { + "offset": offset, + "limit": limit + } + if addr: + params["addr"] = addr + if name: + params["name"] = name + if name_contains: + params["name_contains"] = name_contains + if type: + params["type"] = type + + response = safe_get(port, "data", params) + simplified = simplify_response(response) + + # Ensure we maintain pagination metadata + if isinstance(simplified, dict) and "error" not in simplified: + simplified.setdefault("size", len(simplified.get("result", []))) + simplified.setdefault("offset", offset) + simplified.setdefault("limit", limit) + + return simplified + +@mcp.tool() +def data_create(address: str, data_type: str, size: int = None, port: int = None) -> dict: + """Define a new data item at the specified address + + Args: + address: Memory address in hex format + data_type: Data type (e.g. "string", "dword", "byte") + size: Optional size in bytes for the data item + port: Specific Ghidra instance port (optional) + + Returns: + dict: Operation result with the created data information + """ + if not address or not data_type: + return { + "success": False, + "error": "Address and data_type parameters are required", + "timestamp": int(time.time() * 1000) + } + + port = _get_instance_port(port) + + payload = { + "address": address, + "type": data_type + } + + if size is not None: + payload["size"] = size + + response = safe_post(port, "data", payload) + return simplify_response(response) + +@mcp.tool() +def data_rename(address: str, name: str, port: int = None) -> dict: + """Rename a data item + + Args: + address: Memory address in hex format + name: New name for the data item + port: Specific Ghidra instance port (optional) + + Returns: + dict: Operation result with the updated data information + """ + if not address or not name: + return { + "success": False, + "error": { + "code": "MISSING_PARAMETER", + "message": "Address and name parameters are required" + }, + "timestamp": int(time.time() * 1000) + } + + port = _get_instance_port(port) + + payload = { + "address": address, + "newName": name + } + + response = safe_post(port, "data", payload) + return simplify_response(response) + +@mcp.tool() +def data_delete(address: str, port: int = None) -> dict: + """Delete data at the specified address + + Args: + address: Memory address in hex format + port: Specific Ghidra instance port (optional) + + Returns: + dict: Operation result + """ + if not address: + return { + "success": False, + "error": "Address parameter is required", + "timestamp": int(time.time() * 1000) + } + + port = _get_instance_port(port) + + payload = { + "address": address, + "action": "delete" + } + + response = safe_post(port, "data/delete", payload) + return simplify_response(response) + +@mcp.tool() +def data_set_type(address: str, data_type: str, port: int = None) -> dict: + """Set the data type of a data item + + Args: + address: Memory address in hex format + data_type: Data type name (e.g. "uint32_t", "char[10]") + port: Specific Ghidra instance port (optional) + + Returns: + dict: Operation result with the updated data information + """ + if not address or not data_type: + return { + "success": False, + "error": { + "code": "MISSING_PARAMETER", + "message": "Address and data_type parameters are required" + }, + "timestamp": int(time.time() * 1000) + } + + port = _get_instance_port(port) + + payload = { + "address": address, + "type": data_type + } + + response = safe_post(port, "data/type", payload) + return simplify_response(response) + +# Analysis tools +@mcp.tool() +def analysis_run(port: int = None, analysis_options: dict = None) -> dict: + """Run analysis on the current program + + Args: + analysis_options: Dictionary of analysis options to enable/disable + (e.g. {"functionRecovery": True, "dataRefs": False}) + port: Specific Ghidra instance port (optional) + + Returns: + dict: Analysis operation result with status + """ + port = _get_instance_port(port) + response = safe_post(port, "analysis", analysis_options or {}) + return simplify_response(response) + +@mcp.tool() +def analysis_get_callgraph(function: str = None, max_depth: int = 3, port: int = None) -> dict: + """Get function call graph visualization data + + Args: + function: Starting function name or address (None starts from entry point) + max_depth: Maximum call depth to analyze (default: 3) + port: Specific Ghidra instance port (optional) + + Returns: + dict: Graph data with nodes and edges + """ + port = _get_instance_port(port) + + params = {"max_depth": max_depth} + if function: + params["function"] = function + + response = safe_get(port, "analysis/callgraph", params) + return simplify_response(response) + +@mcp.tool() +def analysis_get_dataflow(address: str, direction: str = "forward", max_steps: int = 50, port: int = None) -> dict: + """Perform data flow analysis from an address + + Args: + address: Starting address in hex format + direction: "forward" or "backward" (default: "forward") + max_steps: Maximum analysis steps (default: 50) + port: Specific Ghidra instance port (optional) + + Returns: + dict: Data flow analysis results + """ + if not address: + return { + "success": False, + "error": "Address parameter is required", + "timestamp": int(time.time() * 1000) + } + + port = _get_instance_port(port) + + params = { + "address": address, + "direction": direction, + "max_steps": max_steps + } + + response = safe_get(port, "analysis/dataflow", params) + return simplify_response(response) + +# ================= Startup ================= if __name__ == "__main__": register_instance(DEFAULT_GHIDRA_PORT, f"http://{ghidra_host}:{DEFAULT_GHIDRA_PORT}") - discover_instances() + # Use quick discovery on startup + _discover_instances(QUICK_DISCOVERY_RANGE) + # Start background discovery thread discovery_thread = threading.Thread( target=periodic_discovery, daemon=True, diff --git a/refactoring_namespaces.py b/refactoring_namespaces.py new file mode 100644 index 0000000..cf6486e --- /dev/null +++ b/refactoring_namespaces.py @@ -0,0 +1,1621 @@ +# /// script +# requires-python = ">=3.11" +# dependencies = [ +# "mcp==1.6.0", +# "requests==2.32.3", +# ] +# /// +# GhydraMCP Bridge for Ghidra HATEOAS API - Refactored for MCP optimization +# This provides a revised implementation without tool_group + +import os +import signal +import sys +import threading +import time +from threading import Lock +from typing import Dict, List, Optional, Union, Any +from urllib.parse import quote, urlencode, urlparse + +import requests +from mcp.server.fastmcp import FastMCP + +# ================= Core Infrastructure ================= + +# Allowed origins for CORS/CSRF protection +ALLOWED_ORIGINS = os.environ.get( + "GHIDRA_ALLOWED_ORIGINS", "http://localhost").split(",") + +# Track active Ghidra instances (port -> info dict) +active_instances: Dict[int, dict] = {} +instances_lock = Lock() +DEFAULT_GHIDRA_PORT = 8192 +DEFAULT_GHIDRA_HOST = "localhost" +# Port ranges for scanning +QUICK_DISCOVERY_RANGE = range(DEFAULT_GHIDRA_PORT, DEFAULT_GHIDRA_PORT+10) +FULL_DISCOVERY_RANGE = range(DEFAULT_GHIDRA_PORT, DEFAULT_GHIDRA_PORT+20) + +# Version information +BRIDGE_VERSION = "v2.0.0-beta.1" +REQUIRED_API_VERSION = 2 + +# Global state for the current instance +current_instance_port = DEFAULT_GHIDRA_PORT + +instructions = """ +GhydraMCP allows interacting with multiple Ghidra SRE instances. Ghidra SRE is a tool for reverse engineering and analyzing binaries, e.g. malware. + +First, run `instances_discover()` to find open Ghidra instances. Then use `instances_use(port)` to set your working instance. + +The API is organized into namespaces for different types of operations: +- instances_* : For managing Ghidra instances +- functions_* : For working with functions +- data_* : For working with data items +- memory_* : For memory access +- xrefs_* : For cross-references +- analysis_* : For program analysis +""" + +mcp = FastMCP("GhydraMCP", version=BRIDGE_VERSION, instructions=instructions) + +ghidra_host = os.environ.get("GHIDRA_HYDRA_HOST", DEFAULT_GHIDRA_HOST) + +# Helper function to get the current instance or validate a specific port +def _get_instance_port(port=None): + """Internal helper to get the current instance port or validate a specific port""" + port = port or current_instance_port + # Validate that the instance exists and is active + if port not in active_instances: + # Try to register it if not found + register_instance(port) + if port not in active_instances: + raise ValueError(f"No active Ghidra instance on port {port}") + return port + +# The rest of the utility functions (HTTP helpers, etc.) remain the same... +def get_instance_url(port: int) -> str: + """Get URL for a Ghidra instance by port""" + with instances_lock: + if port in active_instances: + return active_instances[port]["url"] + + if 8192 <= port <= 65535: + register_instance(port) + if port in active_instances: + return active_instances[port]["url"] + + return f"http://{ghidra_host}:{port}" + +def validate_origin(headers: dict) -> bool: + """Validate request origin against allowed origins""" + origin = headers.get("Origin") + if not origin: + # No origin header - allow (browser same-origin policy applies) + return True + + # Parse origin to get scheme+hostname + try: + parsed = urlparse(origin) + origin_base = f"{parsed.scheme}://{parsed.hostname}" + if parsed.port: + origin_base += f":{parsed.port}" + except: + return False + + return origin_base in ALLOWED_ORIGINS + +def _make_request(method: str, port: int, endpoint: str, params: dict = None, + json_data: dict = None, data: str = None, + headers: dict = None) -> dict: + """Internal helper to make HTTP requests and handle common errors.""" + url = f"{get_instance_url(port)}/{endpoint}" + + # Set up headers according to HATEOAS API expected format + request_headers = { + 'Accept': 'application/json', + 'X-Request-ID': f"mcp-bridge-{int(time.time() * 1000)}" + } + + if headers: + request_headers.update(headers) + + is_state_changing = method.upper() in ["POST", "PUT", "PATCH", "DELETE"] + if is_state_changing: + check_headers = json_data.get("headers", {}) if isinstance( + json_data, dict) else (headers or {}) + if not validate_origin(check_headers): + return { + "success": False, + "error": { + "code": "ORIGIN_NOT_ALLOWED", + "message": "Origin not allowed for state-changing request" + }, + "status_code": 403, + "timestamp": int(time.time() * 1000) + } + if json_data is not None: + request_headers['Content-Type'] = 'application/json' + elif data is not None: + request_headers['Content-Type'] = 'text/plain' + + try: + response = requests.request( + method, + url, + params=params, + json=json_data, + data=data, + headers=request_headers, + timeout=10 + ) + + try: + parsed_json = response.json() + + # Add timestamp if not present + if isinstance(parsed_json, dict) and "timestamp" not in parsed_json: + parsed_json["timestamp"] = int(time.time() * 1000) + + # Check for HATEOAS compliant error response format and reformat if needed + if not response.ok and isinstance(parsed_json, dict) and "success" in parsed_json and not parsed_json["success"]: + # Check if error is in the expected HATEOAS format + if "error" in parsed_json and not isinstance(parsed_json["error"], dict): + # Convert string error to the proper format + error_message = parsed_json["error"] + parsed_json["error"] = { + "code": f"HTTP_{response.status_code}", + "message": error_message + } + + return parsed_json + + except ValueError: + if response.ok: + return { + "success": False, + "error": { + "code": "NON_JSON_RESPONSE", + "message": "Received non-JSON success response from Ghidra plugin" + }, + "status_code": response.status_code, + "response_text": response.text[:500], + "timestamp": int(time.time() * 1000) + } + else: + return { + "success": False, + "error": { + "code": f"HTTP_{response.status_code}", + "message": f"Non-JSON error response: {response.text[:100]}..." + }, + "status_code": response.status_code, + "response_text": response.text[:500], + "timestamp": int(time.time() * 1000) + } + + except requests.exceptions.Timeout: + return { + "success": False, + "error": { + "code": "REQUEST_TIMEOUT", + "message": "Request timed out" + }, + "status_code": 408, + "timestamp": int(time.time() * 1000) + } + except requests.exceptions.ConnectionError: + return { + "success": False, + "error": { + "code": "CONNECTION_ERROR", + "message": f"Failed to connect to Ghidra instance at {url}" + }, + "status_code": 503, + "timestamp": int(time.time() * 1000) + } + except Exception as e: + return { + "success": False, + "error": { + "code": "UNEXPECTED_ERROR", + "message": f"An unexpected error occurred: {str(e)}" + }, + "exception": e.__class__.__name__, + "timestamp": int(time.time() * 1000) + } + +def safe_get(port: int, endpoint: str, params: dict = None) -> dict: + """Make GET request to Ghidra instance""" + return _make_request("GET", port, endpoint, params=params) + +def safe_put(port: int, endpoint: str, data: dict) -> dict: + """Make PUT request to Ghidra instance with JSON payload""" + headers = data.pop("headers", None) if isinstance(data, dict) else None + return _make_request("PUT", port, endpoint, json_data=data, headers=headers) + +def safe_post(port: int, endpoint: str, data: Union[dict, str]) -> dict: + """Perform a POST request to a specific Ghidra instance with JSON or text payload""" + headers = None + json_payload = None + text_payload = None + + if isinstance(data, dict): + headers = data.pop("headers", None) + json_payload = data + else: + text_payload = data + + return _make_request("POST", port, endpoint, json_data=json_payload, data=text_payload, headers=headers) + +def safe_patch(port: int, endpoint: str, data: dict) -> dict: + """Perform a PATCH request to a specific Ghidra instance with JSON payload""" + headers = data.pop("headers", None) if isinstance(data, dict) else None + return _make_request("PATCH", port, endpoint, json_data=data, headers=headers) + +def safe_delete(port: int, endpoint: str) -> dict: + """Perform a DELETE request to a specific Ghidra instance""" + return _make_request("DELETE", port, endpoint) + +def simplify_response(response: dict) -> dict: + """ + Simplify HATEOAS response data for easier AI agent consumption + - Removes _links from result entries + - Flattens nested structures when appropriate + - Preserves important metadata + - Converts structured data like disassembly to text for easier consumption + """ + if not isinstance(response, dict): + return response + + # Make a copy to avoid modifying the original + result = response.copy() + + # Store API response metadata + api_metadata = {} + for key in ["id", "instance", "timestamp", "size", "offset", "limit"]: + if key in result: + api_metadata[key] = result.get(key) + + # Simplify the main result data if present + if "result" in result: + # Handle array results + if isinstance(result["result"], list): + simplified_items = [] + for item in result["result"]: + if isinstance(item, dict): + # Store but remove HATEOAS links from individual items + item_copy = item.copy() + links = item_copy.pop("_links", None) + + # Optionally store direct href links as more accessible properties + # This helps AI agents navigate the API without understanding HATEOAS + if isinstance(links, dict): + for link_name, link_data in links.items(): + if isinstance(link_data, dict) and "href" in link_data: + item_copy[f"{link_name}_url"] = link_data["href"] + + simplified_items.append(item_copy) + else: + simplified_items.append(item) + result["result"] = simplified_items + + # Handle object results + elif isinstance(result["result"], dict): + result_copy = result["result"].copy() + + # Store but remove links from result object + links = result_copy.pop("_links", None) + + # Add direct href links for easier navigation + if isinstance(links, dict): + for link_name, link_data in links.items(): + if isinstance(link_data, dict) and "href" in link_data: + result_copy[f"{link_name}_url"] = link_data["href"] + + # Special case for disassembly - convert to text for easier consumption + if "instructions" in result_copy and isinstance(result_copy["instructions"], list): + disasm_text = "" + for instr in result_copy["instructions"]: + if isinstance(instr, dict): + addr = instr.get("address", "") + mnemonic = instr.get("mnemonic", "") + operands = instr.get("operands", "") + bytes_str = instr.get("bytes", "") + + # Format: address: bytes mnemonic operands + disasm_text += f"{addr}: {bytes_str.ljust(10)} {mnemonic} {operands}\n" + + # Add the text representation while preserving the original structured data + result_copy["disassembly_text"] = disasm_text + + # Special case for decompiled code - make sure it's directly accessible + if "ccode" in result_copy: + result_copy["decompiled_text"] = result_copy["ccode"] + elif "decompiled" in result_copy: + result_copy["decompiled_text"] = result_copy["decompiled"] + + result["result"] = result_copy + + # Store but remove HATEOAS links from the top level + links = result.pop("_links", None) + + # Add direct href links in a more accessible format + if isinstance(links, dict): + api_links = {} + for link_name, link_data in links.items(): + if isinstance(link_data, dict) and "href" in link_data: + api_links[link_name] = link_data["href"] + + # Add simplified links + if api_links: + result["api_links"] = api_links + + # Restore API metadata + for key, value in api_metadata.items(): + if key not in result: + result[key] = value + + return result + +def register_instance(port: int, url: str = None) -> str: + """Register a new Ghidra instance + + Args: + port: Port number of the Ghidra instance + url: Optional URL if different from default http://host:port + + Returns: + str: Confirmation message or error + """ + if url is None: + url = f"http://{ghidra_host}:{port}" + + try: + # Check for HATEOAS API by checking plugin-version endpoint + test_url = f"{url}/plugin-version" + response = requests.get(test_url, timeout=2) + + if not response.ok: + return f"Error: Instance at {url} is not responding properly to HATEOAS API" + + project_info = {"url": url} + + try: + # Check plugin version to ensure compatibility + try: + version_data = response.json() + if "result" in version_data: + result = version_data["result"] + if isinstance(result, dict): + plugin_version = result.get("plugin_version", "") + api_version = result.get("api_version", 0) + + project_info["plugin_version"] = plugin_version + project_info["api_version"] = api_version + + # Verify API version compatibility + if api_version != REQUIRED_API_VERSION: + error_msg = f"API version mismatch: Plugin reports version {api_version}, but bridge requires version {REQUIRED_API_VERSION}" + print(error_msg, file=sys.stderr) + return error_msg + + print(f"Connected to Ghidra plugin version {plugin_version} with API version {api_version}") + except Exception as e: + print(f"Error parsing plugin version: {e}", file=sys.stderr) + + # Get program info from HATEOAS API + info_url = f"{url}/program" + + try: + info_response = requests.get(info_url, timeout=2) + if info_response.ok: + try: + info_data = info_response.json() + if "result" in info_data: + result = info_data["result"] + if isinstance(result, dict): + # Extract project and file from programId (format: "project:/file") + program_id = result.get("programId", "") + if ":" in program_id: + project_name, file_path = program_id.split(":", 1) + project_info["project"] = project_name + # Remove leading slash from file path if present + if file_path.startswith("/"): + file_path = file_path[1:] + project_info["path"] = file_path + + # Get file name directly from the result + project_info["file"] = result.get("name", "") + + # Get other metadata + project_info["language_id"] = result.get("languageId", "") + project_info["compiler_spec_id"] = result.get("compilerSpecId", "") + project_info["image_base"] = result.get("image_base", "") + + # Store _links from result for HATEOAS navigation + if "_links" in result: + project_info["_links"] = result.get("_links", {}) + except Exception as e: + print(f"Error parsing info endpoint: {e}", file=sys.stderr) + except Exception as e: + print(f"Error connecting to info endpoint: {e}", file=sys.stderr) + except Exception: + # Non-critical, continue with registration even if project info fails + pass + + with instances_lock: + active_instances[port] = project_info + + return f"Registered instance on port {port} at {url}" + except Exception as e: + return f"Error: Could not connect to instance at {url}: {str(e)}" + +def _discover_instances(port_range, host=None, timeout=0.5) -> dict: + """Internal function to discover Ghidra instances by scanning ports""" + found_instances = [] + scan_host = host if host is not None else ghidra_host + + for port in port_range: + if port in active_instances: + continue + + url = f"http://{scan_host}:{port}" + try: + # Try HATEOAS API via plugin-version endpoint + test_url = f"{url}/plugin-version" + response = requests.get(test_url, + headers={'Accept': 'application/json', + 'X-Request-ID': f"discovery-{int(time.time() * 1000)}"}, + timeout=timeout) + + if response.ok: + # Further validate it's a GhydraMCP instance by checking response format + try: + json_data = response.json() + if "success" in json_data and json_data["success"] and "result" in json_data: + # Looks like a valid HATEOAS API response + # Instead of relying only on register_instance, which already checks program info, + # extract additional information here for more detailed discovery results + result = register_instance(port, url) + + # Initialize report info + instance_info = { + "port": port, + "url": url + } + + # Extract version info for reporting + if isinstance(json_data["result"], dict): + instance_info["plugin_version"] = json_data["result"].get("plugin_version", "unknown") + instance_info["api_version"] = json_data["result"].get("api_version", "unknown") + else: + instance_info["plugin_version"] = "unknown" + instance_info["api_version"] = "unknown" + + # Include project details from registered instance in the report + if port in active_instances: + instance_info["project"] = active_instances[port].get("project", "") + instance_info["file"] = active_instances[port].get("file", "") + + instance_info["result"] = result + found_instances.append(instance_info) + except (ValueError, KeyError): + # Not a valid JSON response or missing expected keys + print(f"Port {port} returned non-HATEOAS response", file=sys.stderr) + continue + + except requests.exceptions.RequestException: + # Instance not available, just continue + continue + + return { + "found": len(found_instances), + "instances": found_instances + } + +def periodic_discovery(): + """Periodically discover new instances""" + while True: + try: + _discover_instances(FULL_DISCOVERY_RANGE, timeout=0.5) + + with instances_lock: + ports_to_remove = [] + for port, info in active_instances.items(): + url = info["url"] + try: + # Check HATEOAS API via plugin-version endpoint + response = requests.get(f"{url}/plugin-version", timeout=1) + if not response.ok: + ports_to_remove.append(port) + continue + + # Update program info if available (especially to get project name) + try: + info_url = f"{url}/program" + info_response = requests.get(info_url, timeout=1) + if info_response.ok: + try: + info_data = info_response.json() + if "result" in info_data: + result = info_data["result"] + if isinstance(result, dict): + # Extract project and file from programId (format: "project:/file") + program_id = result.get("programId", "") + if ":" in program_id: + project_name, file_path = program_id.split(":", 1) + info["project"] = project_name + # Remove leading slash from file path if present + if file_path.startswith("/"): + file_path = file_path[1:] + info["path"] = file_path + + # Get file name directly from the result + info["file"] = result.get("name", "") + + # Get other metadata + info["language_id"] = result.get("languageId", "") + info["compiler_spec_id"] = result.get("compilerSpecId", "") + info["image_base"] = result.get("image_base", "") + except Exception as e: + print(f"Error parsing info endpoint during discovery: {e}", file=sys.stderr) + except Exception: + # Non-critical, continue even if update fails + pass + + except requests.exceptions.RequestException: + ports_to_remove.append(port) + + for port in ports_to_remove: + del active_instances[port] + print(f"Removed unreachable instance on port {port}") + except Exception as e: + print(f"Error in periodic discovery: {e}") + + time.sleep(30) + +def handle_sigint(signum, frame): + os._exit(0) + +# ================= MCP Resources ================= +# Resources provide information that can be loaded directly into context +# They focus on data and minimize metadata + +@mcp.resource() +def ghidra_instance(port: int = None) -> dict: + """Get detailed information about a Ghidra instance and the loaded program + + Args: + port: Specific Ghidra instance port (optional, uses current if omitted) + + Returns: + dict: Detailed information about the Ghidra instance and loaded program + """ + port = _get_instance_port(port) + response = safe_get(port, "program") + + if not isinstance(response, dict) or not response.get("success", False): + return {"error": f"Unable to access Ghidra instance on port {port}"} + + # Extract only the most relevant information for the resource + result = response.get("result", {}) + + if not isinstance(result, dict): + return {"error": "Invalid response format from Ghidra instance"} + + instance_info = { + "port": port, + "url": get_instance_url(port), + "program_name": result.get("name", "unknown"), + "program_id": result.get("programId", "unknown"), + "language": result.get("languageId", "unknown"), + "compiler": result.get("compilerSpecId", "unknown"), + "base_address": result.get("imageBase", "0x0"), + "memory_size": result.get("memorySize", 0), + "analysis_complete": result.get("analysisComplete", False) + } + + # Add project information if available + if "project" in active_instances[port]: + instance_info["project"] = active_instances[port]["project"] + + return instance_info + +@mcp.resource() +def decompiled_function(name: str = None, address: str = None, port: int = None) -> str: + """Get decompiled C code for a function + + Args: + name: Function name (mutually exclusive with address) + address: Function address in hex format (mutually exclusive with address) + port: Specific Ghidra instance port (optional) + + Returns: + str: The decompiled C code as a string, or error message + """ + if not name and not address: + return "Error: Either name or address parameter is required" + + port = _get_instance_port(port) + + params = { + "syntax_tree": "false", + "style": "normalize" + } + + if address: + endpoint = f"functions/{address}/decompile" + else: + endpoint = f"functions/by-name/{quote(name)}/decompile" + + response = safe_get(port, endpoint, params) + simplified = simplify_response(response) + + # For a resource, we want to directly return just the decompiled code + if (not isinstance(simplified, dict) or + not simplified.get("success", False) or + "result" not in simplified): + error_message = "Error: Could not decompile function" + if isinstance(simplified, dict) and "error" in simplified: + if isinstance(simplified["error"], dict): + error_message = simplified["error"].get("message", error_message) + else: + error_message = str(simplified["error"]) + return error_message + + # Extract just the decompiled code text + result = simplified["result"] + + # Different endpoints may return the code in different fields, try all of them + if isinstance(result, dict): + for key in ["decompiled_text", "ccode", "decompiled"]: + if key in result: + return result[key] + + return "Error: Could not extract decompiled code from response" + +@mcp.resource() +def function_info(name: str = None, address: str = None, port: int = None) -> dict: + """Get detailed information about a function + + Args: + name: Function name (mutually exclusive with address) + address: Function address in hex format (mutually exclusive with address) + port: Specific Ghidra instance port (optional) + + Returns: + dict: Complete function information including signature, parameters, etc. + """ + if not name and not address: + return {"error": "Either name or address parameter is required"} + + port = _get_instance_port(port) + + if address: + endpoint = f"functions/{address}" + else: + endpoint = f"functions/by-name/{quote(name)}" + + response = safe_get(port, endpoint) + simplified = simplify_response(response) + + if (not isinstance(simplified, dict) or + not simplified.get("success", False) or + "result" not in simplified): + error = {"error": "Could not get function information"} + if isinstance(simplified, dict) and "error" in simplified: + error["error_details"] = simplified["error"] + return error + + # Return just the function data without API metadata + return simplified["result"] + +@mcp.resource() +def disassembly(name: str = None, address: str = None, port: int = None) -> str: + """Get disassembled instructions for a function + + Args: + name: Function name (mutually exclusive with address) + address: Function address in hex format (mutually exclusive with address) + port: Specific Ghidra instance port (optional) + + Returns: + str: Formatted disassembly listing as a string + """ + if not name and not address: + return "Error: Either name or address parameter is required" + + port = _get_instance_port(port) + + if address: + endpoint = f"functions/{address}/disassembly" + else: + endpoint = f"functions/by-name/{quote(name)}/disassembly" + + response = safe_get(port, endpoint) + simplified = simplify_response(response) + + if (not isinstance(simplified, dict) or + not simplified.get("success", False) or + "result" not in simplified): + error_message = "Error: Could not get disassembly" + if isinstance(simplified, dict) and "error" in simplified: + if isinstance(simplified["error"], dict): + error_message = simplified["error"].get("message", error_message) + else: + error_message = str(simplified["error"]) + return error_message + + # For a resource, we want to directly return just the disassembly text + result = simplified["result"] + + # Check if we have a disassembly_text field already + if isinstance(result, dict) and "disassembly_text" in result: + return result["disassembly_text"] + + # Otherwise if we have raw instructions, format them ourselves + if isinstance(result, dict) and "instructions" in result and isinstance(result["instructions"], list): + disasm_text = "" + for instr in result["instructions"]: + if isinstance(instr, dict): + addr = instr.get("address", "") + mnemonic = instr.get("mnemonic", "") + operands = instr.get("operands", "") + bytes_str = instr.get("bytes", "") + + # Format: address: bytes mnemonic operands + disasm_text += f"{addr}: {bytes_str.ljust(10)} {mnemonic} {operands}\n" + + return disasm_text + + # If we have a direct disassembly field, try that as well + if isinstance(result, dict) and "disassembly" in result: + return result["disassembly"] + + return "Error: Could not extract disassembly from response" + +# ================= MCP Prompts ================= +# Prompts define reusable templates for LLM interactions + +@mcp.prompt("analyze_function") +def analyze_function_prompt(name: str = None, address: str = None, port: int = None): + """A prompt to guide the LLM through analyzing a function + + Args: + name: Function name (mutually exclusive with address) + address: Function address in hex format (mutually exclusive with address) + port: Specific Ghidra instance port (optional) + """ + port = _get_instance_port(port) + + # Get function name if only address is provided + if address and not name: + fn_info = function_info(address=address, port=port) + if isinstance(fn_info, dict) and "name" in fn_info: + name = fn_info["name"] + + # Create the template that guides analysis + return { + "prompt": f""" + Analyze the following function: {name or address} + + Decompiled code: + ```c + {decompiled_function(name=name, address=address, port=port)} + ``` + + Disassembly: + ``` + {disassembly(name=name, address=address, port=port)} + ``` + + 1. What is the purpose of this function? + 2. What are the key parameters and their uses? + 3. What are the return values and their meanings? + 4. Are there any security concerns in this implementation? + 5. Describe the algorithm or process being implemented. + """, + "context": { + "function_info": function_info(name=name, address=address, port=port) + } + } + +@mcp.prompt("identify_vulnerabilities") +def identify_vulnerabilities_prompt(name: str = None, address: str = None, port: int = None): + """A prompt to help identify potential vulnerabilities in a function + + Args: + name: Function name (mutually exclusive with address) + address: Function address in hex format (mutually exclusive with address) + port: Specific Ghidra instance port (optional) + """ + port = _get_instance_port(port) + + # Get function name if only address is provided + if address and not name: + fn_info = function_info(address=address, port=port) + if isinstance(fn_info, dict) and "name" in fn_info: + name = fn_info["name"] + + # Create the template focused on security analysis + return { + "prompt": f""" + Analyze the following function for security vulnerabilities: {name or address} + + Decompiled code: + ```c + {decompiled_function(name=name, address=address, port=port)} + ``` + + Look for these vulnerability types: + 1. Buffer overflows or underflows + 2. Integer overflow/underflow + 3. Use-after-free or double-free bugs + 4. Format string vulnerabilities + 5. Missing bounds checks + 6. Insecure memory operations + 7. Race conditions or timing issues + 8. Input validation problems + + For each potential vulnerability: + - Describe the vulnerability and where it occurs + - Explain the security impact + - Suggest how it could be exploited + - Recommend a fix + """, + "context": { + "function_info": function_info(name=name, address=address, port=port), + "disassembly": disassembly(name=name, address=address, port=port) + } + } + +# ================= MCP Tools ================= +# Since we can't use tool groups, we'll use namespaces in the function names + +# Instance management tools +@mcp.tool() +def instances_list() -> dict: + """List all active Ghidra instances""" + with instances_lock: + return { + "instances": [ + { + "port": port, + "url": info["url"], + "project": info.get("project", ""), + "file": info.get("file", "") + } + for port, info in active_instances.items() + ] + } + +@mcp.tool() +def instances_discover(host: str = None) -> dict: + """Discover available Ghidra instances by scanning ports + + Args: + host: Optional host to scan (default: configured ghidra_host) + + Returns: + dict: Contains 'found' count and 'instances' list with discovery results + """ + return _discover_instances(QUICK_DISCOVERY_RANGE, host=host, timeout=0.5) + +@mcp.tool() +def instances_register(port: int, url: str = None) -> str: + """Register a new Ghidra instance + + Args: + port: Port number of the Ghidra instance + url: Optional URL if different from default http://host:port + + Returns: + str: Confirmation message or error + """ + return register_instance(port, url) + +@mcp.tool() +def instances_unregister(port: int) -> str: + """Unregister a Ghidra instance + + Args: + port: Port number of the instance to unregister + + Returns: + str: Confirmation message or error + """ + with instances_lock: + if port in active_instances: + del active_instances[port] + return f"Unregistered instance on port {port}" + return f"No instance found on port {port}" + +@mcp.tool() +def instances_use(port: int) -> str: + """Set the current working Ghidra instance + + Args: + port: Port number of the instance to use + + Returns: + str: Confirmation message or error + """ + global current_instance_port + + # First validate that the instance exists and is active + if port not in active_instances: + # Try to register it if not found + register_instance(port) + if port not in active_instances: + return f"Error: No active Ghidra instance found on port {port}" + + # Set as current instance + current_instance_port = port + + # Return information about the selected instance + with instances_lock: + info = active_instances[port] + program = info.get("file", "unknown program") + project = info.get("project", "unknown project") + return f"Now using Ghidra instance on port {port} with {program} in project {project}" + +@mcp.tool() +def instances_current() -> dict: + """Get information about the current working Ghidra instance + + Returns: + dict: Details about the current instance and program + """ + return ghidra_instance(port=current_instance_port) + +# Function tools +@mcp.tool() +def functions_list(offset: int = 0, limit: int = 100, + name_contains: str = None, + name_matches_regex: str = None, + port: int = None) -> dict: + """List functions with filtering and pagination + + Args: + offset: Pagination offset (default: 0) + limit: Maximum items to return (default: 100) + name_contains: Substring name filter (case-insensitive) + name_matches_regex: Regex name filter + port: Specific Ghidra instance port (optional) + + Returns: + dict: List of functions with pagination information + """ + port = _get_instance_port(port) + + params = { + "offset": offset, + "limit": limit + } + if name_contains: + params["name_contains"] = name_contains + if name_matches_regex: + params["name_matches_regex"] = name_matches_regex + + response = safe_get(port, "functions", params) + simplified = simplify_response(response) + + # Ensure we maintain pagination metadata + if isinstance(simplified, dict) and "error" not in simplified: + simplified.setdefault("size", len(simplified.get("result", []))) + simplified.setdefault("offset", offset) + simplified.setdefault("limit", limit) + + return simplified + +@mcp.tool() +def functions_get(name: str = None, address: str = None, port: int = None) -> dict: + """Get detailed information about a function + + Args: + name: Function name (mutually exclusive with address) + address: Function address in hex format (mutually exclusive with name) + port: Specific Ghidra instance port (optional) + + Returns: + dict: Detailed function information + """ + if not name and not address: + return { + "success": False, + "error": { + "code": "MISSING_PARAMETER", + "message": "Either name or address parameter is required" + }, + "timestamp": int(time.time() * 1000) + } + + port = _get_instance_port(port) + + if address: + endpoint = f"functions/{address}" + else: + endpoint = f"functions/by-name/{quote(name)}" + + response = safe_get(port, endpoint) + return simplify_response(response) + +@mcp.tool() +def functions_decompile(name: str = None, address: str = None, + syntax_tree: bool = False, style: str = "normalize", + port: int = None) -> dict: + """Get decompiled code for a function + + Args: + name: Function name (mutually exclusive with address) + address: Function address in hex format (mutually exclusive with name) + syntax_tree: Include syntax tree (default: False) + style: Decompiler style (default: "normalize") + port: Specific Ghidra instance port (optional) + + Returns: + dict: Contains function information and decompiled code + """ + if not name and not address: + return { + "success": False, + "error": { + "code": "MISSING_PARAMETER", + "message": "Either name or address parameter is required" + }, + "timestamp": int(time.time() * 1000) + } + + port = _get_instance_port(port) + + params = { + "syntax_tree": str(syntax_tree).lower(), + "style": style + } + + if address: + endpoint = f"functions/{address}/decompile" + else: + endpoint = f"functions/by-name/{quote(name)}/decompile" + + response = safe_get(port, endpoint, params) + simplified = simplify_response(response) + + # For AI consumption, make the decompiled code more directly accessible + if "result" in simplified and isinstance(simplified["result"], dict): + if "decompiled" in simplified["result"]: + simplified["decompiled_code"] = simplified["result"]["decompiled"] + elif "ccode" in simplified["result"]: + simplified["decompiled_code"] = simplified["result"]["ccode"] + elif "decompiled_text" in simplified["result"]: + simplified["decompiled_code"] = simplified["result"]["decompiled_text"] + + return simplified + +@mcp.tool() +def functions_disassemble(name: str = None, address: str = None, port: int = None) -> dict: + """Get disassembly for a function + + Args: + name: Function name (mutually exclusive with address) + address: Function address in hex format (mutually exclusive with name) + port: Specific Ghidra instance port (optional) + + Returns: + dict: Contains function information and disassembly text + """ + if not name and not address: + return { + "success": False, + "error": { + "code": "MISSING_PARAMETER", + "message": "Either name or address parameter is required" + }, + "timestamp": int(time.time() * 1000) + } + + port = _get_instance_port(port) + + if address: + endpoint = f"functions/{address}/disassembly" + else: + endpoint = f"functions/by-name/{quote(name)}/disassembly" + + response = safe_get(port, endpoint) + return simplify_response(response) + +@mcp.tool() +def functions_create(address: str, port: int = None) -> dict: + """Create a new function at the specified address + + Args: + address: Memory address in hex format where function starts + port: Specific Ghidra instance port (optional) + + Returns: + dict: Operation result with the created function information + """ + if not address: + return { + "success": False, + "error": { + "code": "MISSING_PARAMETER", + "message": "Address parameter is required" + }, + "timestamp": int(time.time() * 1000) + } + + port = _get_instance_port(port) + + payload = { + "address": address + } + + response = safe_post(port, "functions", payload) + return simplify_response(response) + +@mcp.tool() +def functions_rename(old_name: str = None, address: str = None, new_name: str = "", port: int = None) -> dict: + """Rename a function + + Args: + old_name: Current function name (mutually exclusive with address) + address: Function address in hex format (mutually exclusive with name) + new_name: New function name + port: Specific Ghidra instance port (optional) + + Returns: + dict: Operation result with the updated function information + """ + if not (old_name or address) or not new_name: + return { + "success": False, + "error": { + "code": "MISSING_PARAMETER", + "message": "Either old_name or address, and new_name parameters are required" + }, + "timestamp": int(time.time() * 1000) + } + + port = _get_instance_port(port) + + payload = { + "name": new_name + } + + if address: + endpoint = f"functions/{address}" + else: + endpoint = f"functions/by-name/{quote(old_name)}" + + response = safe_patch(port, endpoint, payload) + return simplify_response(response) + +@mcp.tool() +def functions_set_signature(name: str = None, address: str = None, signature: str = "", port: int = None) -> dict: + """Set function signature/prototype + + Args: + name: Function name (mutually exclusive with address) + address: Function address in hex format (mutually exclusive with name) + signature: New function signature (e.g., "int func(char *data, int size)") + port: Specific Ghidra instance port (optional) + + Returns: + dict: Operation result with the updated function information + """ + if not (name or address) or not signature: + return { + "success": False, + "error": { + "code": "MISSING_PARAMETER", + "message": "Either name or address, and signature parameters are required" + }, + "timestamp": int(time.time() * 1000) + } + + port = _get_instance_port(port) + + payload = { + "signature": signature + } + + if address: + endpoint = f"functions/{address}" + else: + endpoint = f"functions/by-name/{quote(name)}" + + response = safe_patch(port, endpoint, payload) + return simplify_response(response) + +@mcp.tool() +def functions_get_variables(name: str = None, address: str = None, port: int = None) -> dict: + """Get variables for a function + + Args: + name: Function name (mutually exclusive with address) + address: Function address in hex format (mutually exclusive with name) + port: Specific Ghidra instance port (optional) + + Returns: + dict: Contains function information and list of variables + """ + if not name and not address: + return { + "success": False, + "error": { + "code": "MISSING_PARAMETER", + "message": "Either name or address parameter is required" + }, + "timestamp": int(time.time() * 1000) + } + + port = _get_instance_port(port) + + if address: + endpoint = f"functions/{address}/variables" + else: + endpoint = f"functions/by-name/{quote(name)}/variables" + + response = safe_get(port, endpoint) + return simplify_response(response) + +# Memory tools +@mcp.tool() +def memory_read(address: str, length: int = 16, format: str = "hex", port: int = None) -> dict: + """Read bytes from memory + + Args: + address: Memory address in hex format + length: Number of bytes to read (default: 16) + format: Output format - "hex", "base64", or "string" (default: "hex") + port: Specific Ghidra instance port (optional) + + Returns: + dict: { + "address": original address, + "length": bytes read, + "format": output format, + "hexBytes": the memory contents as hex string, + "rawBytes": the memory contents as base64 string, + "timestamp": response timestamp + } + """ + if not address: + return { + "success": False, + "error": { + "code": "MISSING_PARAMETER", + "message": "Address parameter is required" + }, + "timestamp": int(time.time() * 1000) + } + + port = _get_instance_port(port) + + # Use query parameters instead of path parameters for more reliable handling + params = { + "address": address, + "length": length, + "format": format + } + + response = safe_get(port, "memory", params) + simplified = simplify_response(response) + + # Ensure the result is simple and directly usable + if "result" in simplified and isinstance(simplified["result"], dict): + result = simplified["result"] + + # Pass through all representations of the bytes + memory_info = { + "success": True, + "address": result.get("address", address), + "length": result.get("bytesRead", length), + "format": format, + "timestamp": simplified.get("timestamp", int(time.time() * 1000)) + } + + # Include all the different byte representations + if "hexBytes" in result: + memory_info["hexBytes"] = result["hexBytes"] + if "rawBytes" in result: + memory_info["rawBytes"] = result["rawBytes"] + + return memory_info + + return simplified + +@mcp.tool() +def memory_write(address: str, bytes_data: str, format: str = "hex", port: int = None) -> dict: + """Write bytes to memory (use with caution) + + Args: + address: Memory address in hex format + bytes_data: Data to write (format depends on 'format' parameter) + format: Input format - "hex", "base64", or "string" (default: "hex") + port: Specific Ghidra instance port (optional) + + Returns: + dict: Operation result with success status + """ + if not address: + return { + "success": False, + "error": { + "code": "MISSING_PARAMETER", + "message": "Address parameter is required" + }, + "timestamp": int(time.time() * 1000) + } + + if not bytes_data: + return { + "success": False, + "error": { + "code": "MISSING_PARAMETER", + "message": "Bytes parameter is required" + }, + "timestamp": int(time.time() * 1000) + } + + port = _get_instance_port(port) + + payload = { + "bytes": bytes_data, + "format": format + } + + response = safe_patch(port, f"memory/{address}", payload) + return simplify_response(response) + +# Xrefs tools +@mcp.tool() +def xrefs_list(to_addr: str = None, from_addr: str = None, type: str = None, + offset: int = 0, limit: int = 100, port: int = None) -> dict: + """List cross-references with filtering and pagination + + Args: + to_addr: Filter references to this address (hexadecimal) + from_addr: Filter references from this address (hexadecimal) + type: Filter by reference type (e.g. "CALL", "READ", "WRITE") + offset: Pagination offset (default: 0) + limit: Maximum items to return (default: 100) + port: Specific Ghidra instance port (optional) + + Returns: + dict: Cross-references matching the filters + """ + # At least one of the address parameters must be provided + if not to_addr and not from_addr: + return { + "success": False, + "error": { + "code": "MISSING_PARAMETER", + "message": "Either to_addr or from_addr parameter is required" + }, + "timestamp": int(time.time() * 1000) + } + + port = _get_instance_port(port) + + params = { + "offset": offset, + "limit": limit + } + if to_addr: + params["to_addr"] = to_addr + if from_addr: + params["from_addr"] = from_addr + if type: + params["type"] = type + + response = safe_get(port, "xrefs", params) + simplified = simplify_response(response) + + # Ensure we maintain pagination metadata + if isinstance(simplified, dict) and "error" not in simplified: + simplified.setdefault("size", len(simplified.get("result", []))) + simplified.setdefault("offset", offset) + simplified.setdefault("limit", limit) + + return simplified + +# Data tools +@mcp.tool() +def data_list(offset: int = 0, limit: int = 100, addr: str = None, + name: str = None, name_contains: str = None, type: str = None, + port: int = None) -> dict: + """List defined data items with filtering and pagination + + Args: + offset: Pagination offset (default: 0) + limit: Maximum items to return (default: 100) + addr: Filter by address (hexadecimal) + name: Exact name match filter (case-sensitive) + name_contains: Substring name filter (case-insensitive) + type: Filter by data type (e.g. "string", "dword") + port: Specific Ghidra instance port (optional) + + Returns: + dict: Data items matching the filters + """ + port = _get_instance_port(port) + + params = { + "offset": offset, + "limit": limit + } + if addr: + params["addr"] = addr + if name: + params["name"] = name + if name_contains: + params["name_contains"] = name_contains + if type: + params["type"] = type + + response = safe_get(port, "data", params) + simplified = simplify_response(response) + + # Ensure we maintain pagination metadata + if isinstance(simplified, dict) and "error" not in simplified: + simplified.setdefault("size", len(simplified.get("result", []))) + simplified.setdefault("offset", offset) + simplified.setdefault("limit", limit) + + return simplified + +@mcp.tool() +def data_create(address: str, data_type: str, size: int = None, port: int = None) -> dict: + """Define a new data item at the specified address + + Args: + address: Memory address in hex format + data_type: Data type (e.g. "string", "dword", "byte") + size: Optional size in bytes for the data item + port: Specific Ghidra instance port (optional) + + Returns: + dict: Operation result with the created data information + """ + if not address or not data_type: + return { + "success": False, + "error": "Address and data_type parameters are required", + "timestamp": int(time.time() * 1000) + } + + port = _get_instance_port(port) + + payload = { + "address": address, + "type": data_type + } + + if size is not None: + payload["size"] = size + + response = safe_post(port, "data", payload) + return simplify_response(response) + +@mcp.tool() +def data_rename(address: str, name: str, port: int = None) -> dict: + """Rename a data item + + Args: + address: Memory address in hex format + name: New name for the data item + port: Specific Ghidra instance port (optional) + + Returns: + dict: Operation result with the updated data information + """ + if not address or not name: + return { + "success": False, + "error": { + "code": "MISSING_PARAMETER", + "message": "Address and name parameters are required" + }, + "timestamp": int(time.time() * 1000) + } + + port = _get_instance_port(port) + + payload = { + "address": address, + "newName": name + } + + response = safe_post(port, "data", payload) + return simplify_response(response) + +# Analysis tools +@mcp.tool() +def analysis_run(port: int = None, analysis_options: dict = None) -> dict: + """Run analysis on the current program + + Args: + analysis_options: Dictionary of analysis options to enable/disable + (e.g. {"functionRecovery": True, "dataRefs": False}) + port: Specific Ghidra instance port (optional) + + Returns: + dict: Analysis operation result with status + """ + port = _get_instance_port(port) + response = safe_post(port, "analysis", analysis_options or {}) + return simplify_response(response) + +@mcp.tool() +def analysis_get_callgraph(function: str = None, max_depth: int = 3, port: int = None) -> dict: + """Get function call graph visualization data + + Args: + function: Starting function name or address (None starts from entry point) + max_depth: Maximum call depth to analyze (default: 3) + port: Specific Ghidra instance port (optional) + + Returns: + dict: Graph data with nodes and edges + """ + port = _get_instance_port(port) + + params = {"max_depth": max_depth} + if function: + params["function"] = function + + response = safe_get(port, "analysis/callgraph", params) + return simplify_response(response) + +@mcp.tool() +def analysis_get_dataflow(address: str, direction: str = "forward", max_steps: int = 50, port: int = None) -> dict: + """Perform data flow analysis from an address + + Args: + address: Starting address in hex format + direction: "forward" or "backward" (default: "forward") + max_steps: Maximum analysis steps (default: 50) + port: Specific Ghidra instance port (optional) + + Returns: + dict: Data flow analysis results + """ + if not address: + return { + "success": False, + "error": "Address parameter is required", + "timestamp": int(time.time() * 1000) + } + + port = _get_instance_port(port) + + params = { + "address": address, + "direction": direction, + "max_steps": max_steps + } + + response = safe_get(port, "analysis/dataflow", params) + return simplify_response(response) + +# ================= Startup ================= + +if __name__ == "__main__": + register_instance(DEFAULT_GHIDRA_PORT, + f"http://{ghidra_host}:{DEFAULT_GHIDRA_PORT}") + + # Use quick discovery on startup + _discover_instances(QUICK_DISCOVERY_RANGE) + + # Start background discovery thread + discovery_thread = threading.Thread( + target=periodic_discovery, + daemon=True, + name="GhydraMCP-Discovery" + ) + discovery_thread.start() + + signal.signal(signal.SIGINT, handle_sigint) + mcp.run(transport="stdio") \ No newline at end of file diff --git a/refactoring_proposal.md b/refactoring_proposal.md new file mode 100644 index 0000000..43ac518 --- /dev/null +++ b/refactoring_proposal.md @@ -0,0 +1,261 @@ +# GhydraMCP Bridge Refactoring Proposal + +## Current Issues + +The current bridge implementation exposes all functionality as MCP tools, which creates several problems: + +1. **Discoverability**: With dozens of tool functions, it's difficult for AI agents to identify the correct tool to use for a specific task. + +2. **Consistency**: The API surface is large and not organized by conceptual resources, making it harder to understand what's related. + +3. **Context Loading**: Many operations require repeated loading of program information that could be provided more efficiently as resources. + +4. **Default Selection**: The current approach requires explicit port selection for each operation, instead of following a "current working instance" pattern. + +## Proposed MCP-Oriented Refactoring + +Restructure the bridge to follow MCP patterns more closely: + +### 1. Resources (for Context Loading) + +Resources provide information that can be loaded directly into the LLM's context. + +```python +@mcp.resource() +def ghidra_instance(port: int = None) -> dict: + """Get information about a Ghidra instance or the current working instance + + Args: + port: Specific Ghidra instance port (optional, uses current if omitted) + + Returns: + dict: Detailed information about the Ghidra instance and loaded program + """ + # Implementation that gets instance info and the current program details + # from the currently selected "working" instance or a specific port +``` + +```python +@mcp.resource() +def decompiled_function(name: str = None, address: str = None) -> str: + """Get decompiled C code for a function + + Args: + name: Function name (mutually exclusive with address) + address: Function address in hex format (mutually exclusive with name) + + Returns: + str: The decompiled C code as a string + """ + # Implementation that only returns the decompiled text directly +``` + +```python +@mcp.resource() +def function_info(name: str = None, address: str = None) -> dict: + """Get detailed information about a function + + Args: + name: Function name (mutually exclusive with address) + address: Function address in hex format (mutually exclusive with name) + + Returns: + dict: Complete function information including signature, parameters, etc. + """ + # Implementation that returns detailed function information +``` + +```python +@mcp.resource() +def disassembly(name: str = None, address: str = None) -> str: + """Get disassembled instructions for a function + + Args: + name: Function name (mutually exclusive with address) + address: Function address in hex format (mutually exclusive with name) + + Returns: + str: Formatted disassembly listing as a string + """ + # Implementation that returns formatted text disassembly +``` + +### 2. Prompts (for Interaction Patterns) + +Prompts define reusable templates for LLM interactions, making common workflows easier. + +```python +@mcp.prompt("analyze_function") +def analyze_function_prompt(name: str = None, address: str = None): + """A prompt that guides the LLM through analyzing a function's purpose + + Args: + name: Function name (mutually exclusive with address) + address: Function address in hex format (mutually exclusive with name) + """ + # Implementation returns a prompt template with decompiled code and disassembly + # that helps the LLM systematically analyze a function + return { + "prompt": f""" + Analyze the following function: {name or address} + + Decompiled code: + ```c + {decompiled_function(name=name, address=address)} + ``` + + Disassembly: + ``` + {disassembly(name=name, address=address)} + ``` + + 1. What is the purpose of this function? + 2. What are the key parameters and their uses? + 3. What are the return values and their meanings? + 4. Are there any security concerns in this implementation? + 5. Describe the algorithm or process being implemented. + """, + "context": { + "function_info": function_info(name=name, address=address) + } + } +``` + +```python +@mcp.prompt("identify_vulnerabilities") +def identify_vulnerabilities_prompt(name: str = None, address: str = None): + """A prompt that helps the LLM identify potential vulnerabilities in a function + + Args: + name: Function name (mutually exclusive with address) + address: Function address in hex format (mutually exclusive with name) + """ + # Implementation returns a prompt focused on finding security issues +``` + +### 3. Tools (for Function Selection) + +Tools are organized by domain concepts rather than just mirroring the low-level API. + +```python +@mcp.tool_group("instances") +class InstanceTools: + @mcp.tool() + def list() -> dict: + """List all active Ghidra instances""" + return list_instances() + + @mcp.tool() + def discover() -> dict: + """Discover available Ghidra instances""" + return discover_instances() + + @mcp.tool() + def register(port: int, url: str = None) -> str: + """Register a new Ghidra instance""" + return register_instance(port, url) + + @mcp.tool() + def use(port: int) -> str: + """Set the current working Ghidra instance""" + # Implementation that sets the default instance + global current_instance_port + current_instance_port = port + return f"Now using Ghidra instance on port {port}" +``` + +```python +@mcp.tool_group("functions") +class FunctionTools: + @mcp.tool() + def list(offset: int = 0, limit: int = 100, **filters) -> dict: + """List functions with filtering and pagination""" + # Implementation that uses the current instance + return list_functions(port=current_instance_port, offset=offset, limit=limit, **filters) + + @mcp.tool() + def get(name: str = None, address: str = None) -> dict: + """Get detailed information about a function""" + return get_function(port=current_instance_port, name=name, address=address) + + @mcp.tool() + def create(address: str) -> dict: + """Create a new function at the specified address""" + return create_function(port=current_instance_port, address=address) + + @mcp.tool() + def rename(name: str = None, address: str = None, new_name: str = "") -> dict: + """Rename a function""" + return rename_function(port=current_instance_port, + name=name, address=address, new_name=new_name) + + @mcp.tool() + def set_signature(name: str = None, address: str = None, signature: str = "") -> dict: + """Set a function's signature/prototype""" + return set_function_signature(port=current_instance_port, + name=name, address=address, signature=signature) +``` + +Similar tool groups would be created for: +- `data`: Data manipulation tools +- `memory`: Memory reading/writing tools +- `analysis`: Program analysis tools +- `xrefs`: Cross-reference navigation tools +- `symbols`: Symbol management tools +- `variables`: Variable manipulation tools + +### 4. Simplified Instance Management + +Add a "current working instance" pattern: + +```python +# Global state for the current instance +current_instance_port = DEFAULT_GHIDRA_PORT + +# Helper function to get the current instance or validate a specific port +def _get_instance_port(port=None): + port = port or current_instance_port + # Validate that the instance exists and is active + if port not in active_instances: + # Try to register it if not found + register_instance(port) + if port not in active_instances: + raise ValueError(f"No active Ghidra instance on port {port}") + return port + +# All tools would use this helper, falling back to the current instance if no port is specified +def read_memory(address: str, length: int = 16, format: str = "hex", port: int = None) -> dict: + """Read bytes from memory + + Args: + address: Memory address in hex format + length: Number of bytes to read (default: 16) + format: Output format (default: "hex") + port: Specific Ghidra instance port (optional, uses current if omitted) + + Returns: + dict: Memory content in the requested format + """ + port = _get_instance_port(port) + # Rest of implementation... +``` + +## Migration Strategy + +1. Create a new MCP class structure in a separate file +2. Implement resource loaders for key items (functions, data, memory regions) +3. Implement prompt templates for common tasks +4. Organize tools into logical groups by domain concept +5. Add a current instance selection mechanism +6. Update documentation with clear examples of the new patterns +7. Create backward compatibility shims if needed + +## Benefits of This Approach + +1. **Better Discoverability**: Logical grouping helps agents find the right tool +2. **Context Efficiency**: Resources load just what's needed without extra metadata +3. **Streamlined Interaction**: Tools follow consistent patterns with sensible defaults +4. **Prompt Templates**: Common patterns are codified in reusable prompts +5. **More LLM-friendly**: Outputs optimized for consumption by language models + +The refactored API would be easier to use, more efficient, and better aligned with MCP best practices, while maintaining all the current functionality. \ No newline at end of file diff --git a/refactoring_sample.py b/refactoring_sample.py new file mode 100644 index 0000000..36faada --- /dev/null +++ b/refactoring_sample.py @@ -0,0 +1,1286 @@ +# /// script +# requires-python = ">=3.11" +# dependencies = [ +# "mcp==1.6.0", +# "requests==2.32.3", +# ] +# /// +# GhydraMCP Bridge for Ghidra HATEOAS API - Refactored for MCP optimization +# This provides a sample implementation of the refactoring proposal + +import os +import signal +import sys +import threading +import time +from threading import Lock +from typing import Dict, List, Optional, Union, Any +from urllib.parse import quote, urlencode + +import requests +from mcp.server.fastmcp import FastMCP + +# ================= Core Infrastructure ================= + +# Allowed origins for CORS/CSRF protection +ALLOWED_ORIGINS = os.environ.get( + "GHIDRA_ALLOWED_ORIGINS", "http://localhost").split(",") + +# Track active Ghidra instances (port -> info dict) +active_instances: Dict[int, dict] = {} +instances_lock = Lock() +DEFAULT_GHIDRA_PORT = 8192 +DEFAULT_GHIDRA_HOST = "localhost" +# Port ranges for scanning +QUICK_DISCOVERY_RANGE = range(DEFAULT_GHIDRA_PORT, DEFAULT_GHIDRA_PORT+10) +FULL_DISCOVERY_RANGE = range(DEFAULT_GHIDRA_PORT, DEFAULT_GHIDRA_PORT+20) + +# Version information +BRIDGE_VERSION = "v2.0.0-beta.1" +REQUIRED_API_VERSION = 2 + +# Global state for the current instance +current_instance_port = DEFAULT_GHIDRA_PORT + +instructions = """ +GhydraMCP allows interacting with multiple Ghidra SRE instances. Ghidra SRE is a tool for reverse engineering and analyzing binaries, e.g. malware. + +First, run `instances.discover()` to find open Ghidra instances. Then use `instances.use(port)` to set your working instance. +""" + +mcp = FastMCP("GhydraMCP", version=BRIDGE_VERSION, instructions=instructions) + +ghidra_host = os.environ.get("GHIDRA_HYDRA_HOST", DEFAULT_GHIDRA_HOST) + +# Helper function to get the current instance or validate a specific port +def _get_instance_port(port=None): + """Internal helper to get the current instance port or validate a specific port""" + port = port or current_instance_port + # Validate that the instance exists and is active + if port not in active_instances: + # Try to register it if not found + register_instance(port) + if port not in active_instances: + raise ValueError(f"No active Ghidra instance on port {port}") + return port + +# HTTP request helpers +def get_instance_url(port: int) -> str: + """Get URL for a Ghidra instance by port""" + with instances_lock: + if port in active_instances: + return active_instances[port]["url"] + + if 8192 <= port <= 65535: + register_instance(port) + if port in active_instances: + return active_instances[port]["url"] + + return f"http://{ghidra_host}:{port}" + +def validate_origin(headers: dict) -> bool: + """Validate request origin against allowed origins""" + origin = headers.get("Origin") + if not origin: + # No origin header - allow (browser same-origin policy applies) + return True + + # Parse origin to get scheme+hostname + try: + parsed = urlparse(origin) + origin_base = f"{parsed.scheme}://{parsed.hostname}" + if parsed.port: + origin_base += f":{parsed.port}" + except: + return False + + return origin_base in ALLOWED_ORIGINS + +def _make_request(method: str, port: int, endpoint: str, params: dict = None, + json_data: dict = None, data: str = None, + headers: dict = None) -> dict: + """Internal helper to make HTTP requests and handle common errors.""" + url = f"{get_instance_url(port)}/{endpoint}" + + # Set up headers according to HATEOAS API expected format + request_headers = { + 'Accept': 'application/json', + 'X-Request-ID': f"mcp-bridge-{int(time.time() * 1000)}" + } + + if headers: + request_headers.update(headers) + + is_state_changing = method.upper() in ["POST", "PUT", "PATCH", "DELETE"] + if is_state_changing: + check_headers = json_data.get("headers", {}) if isinstance( + json_data, dict) else (headers or {}) + if not validate_origin(check_headers): + return { + "success": False, + "error": { + "code": "ORIGIN_NOT_ALLOWED", + "message": "Origin not allowed for state-changing request" + }, + "status_code": 403, + "timestamp": int(time.time() * 1000) + } + if json_data is not None: + request_headers['Content-Type'] = 'application/json' + elif data is not None: + request_headers['Content-Type'] = 'text/plain' + + try: + response = requests.request( + method, + url, + params=params, + json=json_data, + data=data, + headers=request_headers, + timeout=10 + ) + + try: + parsed_json = response.json() + + # Add timestamp if not present + if isinstance(parsed_json, dict) and "timestamp" not in parsed_json: + parsed_json["timestamp"] = int(time.time() * 1000) + + # Check for HATEOAS compliant error response format and reformat if needed + if not response.ok and isinstance(parsed_json, dict) and "success" in parsed_json and not parsed_json["success"]: + # Check if error is in the expected HATEOAS format + if "error" in parsed_json and not isinstance(parsed_json["error"], dict): + # Convert string error to the proper format + error_message = parsed_json["error"] + parsed_json["error"] = { + "code": f"HTTP_{response.status_code}", + "message": error_message + } + + return parsed_json + + except ValueError: + if response.ok: + return { + "success": False, + "error": { + "code": "NON_JSON_RESPONSE", + "message": "Received non-JSON success response from Ghidra plugin" + }, + "status_code": response.status_code, + "response_text": response.text[:500], + "timestamp": int(time.time() * 1000) + } + else: + return { + "success": False, + "error": { + "code": f"HTTP_{response.status_code}", + "message": f"Non-JSON error response: {response.text[:100]}..." + }, + "status_code": response.status_code, + "response_text": response.text[:500], + "timestamp": int(time.time() * 1000) + } + + except requests.exceptions.Timeout: + return { + "success": False, + "error": { + "code": "REQUEST_TIMEOUT", + "message": "Request timed out" + }, + "status_code": 408, + "timestamp": int(time.time() * 1000) + } + except requests.exceptions.ConnectionError: + return { + "success": False, + "error": { + "code": "CONNECTION_ERROR", + "message": f"Failed to connect to Ghidra instance at {url}" + }, + "status_code": 503, + "timestamp": int(time.time() * 1000) + } + except Exception as e: + return { + "success": False, + "error": { + "code": "UNEXPECTED_ERROR", + "message": f"An unexpected error occurred: {str(e)}" + }, + "exception": e.__class__.__name__, + "timestamp": int(time.time() * 1000) + } + +def safe_get(port: int, endpoint: str, params: dict = None) -> dict: + """Make GET request to Ghidra instance""" + return _make_request("GET", port, endpoint, params=params) + +def safe_put(port: int, endpoint: str, data: dict) -> dict: + """Make PUT request to Ghidra instance with JSON payload""" + headers = data.pop("headers", None) if isinstance(data, dict) else None + return _make_request("PUT", port, endpoint, json_data=data, headers=headers) + +def safe_post(port: int, endpoint: str, data: Union[dict, str]) -> dict: + """Perform a POST request to a specific Ghidra instance with JSON or text payload""" + headers = None + json_payload = None + text_payload = None + + if isinstance(data, dict): + headers = data.pop("headers", None) + json_payload = data + else: + text_payload = data + + return _make_request("POST", port, endpoint, json_data=json_payload, data=text_payload, headers=headers) + +def safe_patch(port: int, endpoint: str, data: dict) -> dict: + """Perform a PATCH request to a specific Ghidra instance with JSON payload""" + headers = data.pop("headers", None) if isinstance(data, dict) else None + return _make_request("PATCH", port, endpoint, json_data=data, headers=headers) + +def safe_delete(port: int, endpoint: str) -> dict: + """Perform a DELETE request to a specific Ghidra instance""" + return _make_request("DELETE", port, endpoint) + +def simplify_response(response: dict) -> dict: + """ + Simplify HATEOAS response data for easier AI agent consumption + - Removes _links from result entries + - Flattens nested structures when appropriate + - Preserves important metadata + - Converts structured data like disassembly to text for easier consumption + """ + if not isinstance(response, dict): + return response + + # Make a copy to avoid modifying the original + result = response.copy() + + # Store API response metadata + api_metadata = {} + for key in ["id", "instance", "timestamp", "size", "offset", "limit"]: + if key in result: + api_metadata[key] = result.get(key) + + # Simplify the main result data if present + if "result" in result: + # Handle array results + if isinstance(result["result"], list): + simplified_items = [] + for item in result["result"]: + if isinstance(item, dict): + # Store but remove HATEOAS links from individual items + item_copy = item.copy() + links = item_copy.pop("_links", None) + + # Optionally store direct href links as more accessible properties + # This helps AI agents navigate the API without understanding HATEOAS + if isinstance(links, dict): + for link_name, link_data in links.items(): + if isinstance(link_data, dict) and "href" in link_data: + item_copy[f"{link_name}_url"] = link_data["href"] + + simplified_items.append(item_copy) + else: + simplified_items.append(item) + result["result"] = simplified_items + + # Handle object results + elif isinstance(result["result"], dict): + result_copy = result["result"].copy() + + # Store but remove links from result object + links = result_copy.pop("_links", None) + + # Add direct href links for easier navigation + if isinstance(links, dict): + for link_name, link_data in links.items(): + if isinstance(link_data, dict) and "href" in link_data: + result_copy[f"{link_name}_url"] = link_data["href"] + + # Special case for disassembly - convert to text for easier consumption + if "instructions" in result_copy and isinstance(result_copy["instructions"], list): + disasm_text = "" + for instr in result_copy["instructions"]: + if isinstance(instr, dict): + addr = instr.get("address", "") + mnemonic = instr.get("mnemonic", "") + operands = instr.get("operands", "") + bytes_str = instr.get("bytes", "") + + # Format: address: bytes mnemonic operands + disasm_text += f"{addr}: {bytes_str.ljust(10)} {mnemonic} {operands}\n" + + # Add the text representation while preserving the original structured data + result_copy["disassembly_text"] = disasm_text + + # Special case for decompiled code - make sure it's directly accessible + if "ccode" in result_copy: + result_copy["decompiled_text"] = result_copy["ccode"] + elif "decompiled" in result_copy: + result_copy["decompiled_text"] = result_copy["decompiled"] + + result["result"] = result_copy + + # Store but remove HATEOAS links from the top level + links = result.pop("_links", None) + + # Add direct href links in a more accessible format + if isinstance(links, dict): + api_links = {} + for link_name, link_data in links.items(): + if isinstance(link_data, dict) and "href" in link_data: + api_links[link_name] = link_data["href"] + + # Add simplified links + if api_links: + result["api_links"] = api_links + + # Restore API metadata + for key, value in api_metadata.items(): + if key not in result: + result[key] = value + + return result + +# ================= Legacy Instance Management ================= + +def register_instance(port: int, url: str = None) -> str: + """Register a new Ghidra instance + + Args: + port: Port number of the Ghidra instance + url: Optional URL if different from default http://host:port + + Returns: + str: Confirmation message or error + """ + if url is None: + url = f"http://{ghidra_host}:{port}" + + try: + # Check for HATEOAS API by checking plugin-version endpoint + test_url = f"{url}/plugin-version" + response = requests.get(test_url, timeout=2) + + if not response.ok: + return f"Error: Instance at {url} is not responding properly to HATEOAS API" + + project_info = {"url": url} + + try: + # Check plugin version to ensure compatibility + try: + version_data = response.json() + if "result" in version_data: + result = version_data["result"] + if isinstance(result, dict): + plugin_version = result.get("plugin_version", "") + api_version = result.get("api_version", 0) + + project_info["plugin_version"] = plugin_version + project_info["api_version"] = api_version + + # Verify API version compatibility + if api_version != REQUIRED_API_VERSION: + error_msg = f"API version mismatch: Plugin reports version {api_version}, but bridge requires version {REQUIRED_API_VERSION}" + print(error_msg, file=sys.stderr) + return error_msg + + print(f"Connected to Ghidra plugin version {plugin_version} with API version {api_version}") + except Exception as e: + print(f"Error parsing plugin version: {e}", file=sys.stderr) + + # Get program info from HATEOAS API + info_url = f"{url}/program" + + try: + info_response = requests.get(info_url, timeout=2) + if info_response.ok: + try: + info_data = info_response.json() + if "result" in info_data: + result = info_data["result"] + if isinstance(result, dict): + # Extract project and file from programId (format: "project:/file") + program_id = result.get("programId", "") + if ":" in program_id: + project_name, file_path = program_id.split(":", 1) + project_info["project"] = project_name + # Remove leading slash from file path if present + if file_path.startswith("/"): + file_path = file_path[1:] + project_info["path"] = file_path + + # Get file name directly from the result + project_info["file"] = result.get("name", "") + + # Get other metadata + project_info["language_id"] = result.get("languageId", "") + project_info["compiler_spec_id"] = result.get("compilerSpecId", "") + project_info["image_base"] = result.get("image_base", "") + + # Store _links from result for HATEOAS navigation + if "_links" in result: + project_info["_links"] = result.get("_links", {}) + except Exception as e: + print(f"Error parsing info endpoint: {e}", file=sys.stderr) + except Exception as e: + print(f"Error connecting to info endpoint: {e}", file=sys.stderr) + except Exception: + # Non-critical, continue with registration even if project info fails + pass + + with instances_lock: + active_instances[port] = project_info + + return f"Registered instance on port {port} at {url}" + except Exception as e: + return f"Error: Could not connect to instance at {url}: {str(e)}" + +def _discover_instances(port_range, host=None, timeout=0.5) -> dict: + """Internal function to discover Ghidra instances by scanning ports""" + found_instances = [] + scan_host = host if host is not None else ghidra_host + + for port in port_range: + if port in active_instances: + continue + + url = f"http://{scan_host}:{port}" + try: + # Try HATEOAS API via plugin-version endpoint + test_url = f"{url}/plugin-version" + response = requests.get(test_url, + headers={'Accept': 'application/json', + 'X-Request-ID': f"discovery-{int(time.time() * 1000)}"}, + timeout=timeout) + + if response.ok: + # Further validate it's a GhydraMCP instance by checking response format + try: + json_data = response.json() + if "success" in json_data and json_data["success"] and "result" in json_data: + # Looks like a valid HATEOAS API response + # Instead of relying only on register_instance, which already checks program info, + # extract additional information here for more detailed discovery results + result = register_instance(port, url) + + # Initialize report info + instance_info = { + "port": port, + "url": url + } + + # Extract version info for reporting + if isinstance(json_data["result"], dict): + instance_info["plugin_version"] = json_data["result"].get("plugin_version", "unknown") + instance_info["api_version"] = json_data["result"].get("api_version", "unknown") + else: + instance_info["plugin_version"] = "unknown" + instance_info["api_version"] = "unknown" + + # Include project details from registered instance in the report + if port in active_instances: + instance_info["project"] = active_instances[port].get("project", "") + instance_info["file"] = active_instances[port].get("file", "") + + instance_info["result"] = result + found_instances.append(instance_info) + except (ValueError, KeyError): + # Not a valid JSON response or missing expected keys + print(f"Port {port} returned non-HATEOAS response", file=sys.stderr) + continue + + except requests.exceptions.RequestException: + # Instance not available, just continue + continue + + return { + "found": len(found_instances), + "instances": found_instances + } + +def periodic_discovery(): + """Periodically discover new instances""" + while True: + try: + _discover_instances(FULL_DISCOVERY_RANGE, timeout=0.5) + + with instances_lock: + ports_to_remove = [] + for port, info in active_instances.items(): + url = info["url"] + try: + # Check HATEOAS API via plugin-version endpoint + response = requests.get(f"{url}/plugin-version", timeout=1) + if not response.ok: + ports_to_remove.append(port) + continue + + # Update program info if available (especially to get project name) + try: + info_url = f"{url}/program" + info_response = requests.get(info_url, timeout=1) + if info_response.ok: + try: + info_data = info_response.json() + if "result" in info_data: + result = info_data["result"] + if isinstance(result, dict): + # Extract project and file from programId (format: "project:/file") + program_id = result.get("programId", "") + if ":" in program_id: + project_name, file_path = program_id.split(":", 1) + info["project"] = project_name + # Remove leading slash from file path if present + if file_path.startswith("/"): + file_path = file_path[1:] + info["path"] = file_path + + # Get file name directly from the result + info["file"] = result.get("name", "") + + # Get other metadata + info["language_id"] = result.get("languageId", "") + info["compiler_spec_id"] = result.get("compilerSpecId", "") + info["image_base"] = result.get("image_base", "") + except Exception as e: + print(f"Error parsing info endpoint during discovery: {e}", file=sys.stderr) + except Exception: + # Non-critical, continue even if update fails + pass + + except requests.exceptions.RequestException: + ports_to_remove.append(port) + + for port in ports_to_remove: + del active_instances[port] + print(f"Removed unreachable instance on port {port}") + except Exception as e: + print(f"Error in periodic discovery: {e}") + + time.sleep(30) + +# ================= MCP Resources ================= +# Resources provide information that can be loaded directly into context +# They focus on data and minimize metadata + +@mcp.resource() +def ghidra_instance(port: int = None) -> dict: + """Get detailed information about a Ghidra instance and the loaded program + + Args: + port: Specific Ghidra instance port (optional, uses current if omitted) + + Returns: + dict: Detailed information about the Ghidra instance and loaded program + """ + port = _get_instance_port(port) + response = safe_get(port, "program") + + if not isinstance(response, dict) or not response.get("success", False): + return {"error": f"Unable to access Ghidra instance on port {port}"} + + # Extract only the most relevant information for the resource + result = response.get("result", {}) + + if not isinstance(result, dict): + return {"error": "Invalid response format from Ghidra instance"} + + instance_info = { + "port": port, + "url": get_instance_url(port), + "program_name": result.get("name", "unknown"), + "program_id": result.get("programId", "unknown"), + "language": result.get("languageId", "unknown"), + "compiler": result.get("compilerSpecId", "unknown"), + "base_address": result.get("imageBase", "0x0"), + "memory_size": result.get("memorySize", 0), + "analysis_complete": result.get("analysisComplete", False) + } + + # Add project information if available + if "project" in active_instances[port]: + instance_info["project"] = active_instances[port]["project"] + + return instance_info + +@mcp.resource() +def decompiled_function(name: str = None, address: str = None, port: int = None) -> str: + """Get decompiled C code for a function + + Args: + name: Function name (mutually exclusive with address) + address: Function address in hex format (mutually exclusive with address) + port: Specific Ghidra instance port (optional) + + Returns: + str: The decompiled C code as a string, or error message + """ + if not name and not address: + return "Error: Either name or address parameter is required" + + port = _get_instance_port(port) + + params = { + "syntax_tree": "false", + "style": "normalize" + } + + if address: + endpoint = f"functions/{address}/decompile" + else: + endpoint = f"functions/by-name/{quote(name)}/decompile" + + response = safe_get(port, endpoint, params) + simplified = simplify_response(response) + + # For a resource, we want to directly return just the decompiled code + if (not isinstance(simplified, dict) or + not simplified.get("success", False) or + "result" not in simplified): + error_message = "Error: Could not decompile function" + if isinstance(simplified, dict) and "error" in simplified: + if isinstance(simplified["error"], dict): + error_message = simplified["error"].get("message", error_message) + else: + error_message = str(simplified["error"]) + return error_message + + # Extract just the decompiled code text + result = simplified["result"] + + # Different endpoints may return the code in different fields, try all of them + if isinstance(result, dict): + for key in ["decompiled_text", "ccode", "decompiled"]: + if key in result: + return result[key] + + return "Error: Could not extract decompiled code from response" + +@mcp.resource() +def function_info(name: str = None, address: str = None, port: int = None) -> dict: + """Get detailed information about a function + + Args: + name: Function name (mutually exclusive with address) + address: Function address in hex format (mutually exclusive with address) + port: Specific Ghidra instance port (optional) + + Returns: + dict: Complete function information including signature, parameters, etc. + """ + if not name and not address: + return {"error": "Either name or address parameter is required"} + + port = _get_instance_port(port) + + if address: + endpoint = f"functions/{address}" + else: + endpoint = f"functions/by-name/{quote(name)}" + + response = safe_get(port, endpoint) + simplified = simplify_response(response) + + if (not isinstance(simplified, dict) or + not simplified.get("success", False) or + "result" not in simplified): + error = {"error": "Could not get function information"} + if isinstance(simplified, dict) and "error" in simplified: + error["error_details"] = simplified["error"] + return error + + # Return just the function data without API metadata + return simplified["result"] + +@mcp.resource() +def disassembly(name: str = None, address: str = None, port: int = None) -> str: + """Get disassembled instructions for a function + + Args: + name: Function name (mutually exclusive with address) + address: Function address in hex format (mutually exclusive with address) + port: Specific Ghidra instance port (optional) + + Returns: + str: Formatted disassembly listing as a string + """ + if not name and not address: + return "Error: Either name or address parameter is required" + + port = _get_instance_port(port) + + if address: + endpoint = f"functions/{address}/disassembly" + else: + endpoint = f"functions/by-name/{quote(name)}/disassembly" + + response = safe_get(port, endpoint) + simplified = simplify_response(response) + + if (not isinstance(simplified, dict) or + not simplified.get("success", False) or + "result" not in simplified): + error_message = "Error: Could not get disassembly" + if isinstance(simplified, dict) and "error" in simplified: + if isinstance(simplified["error"], dict): + error_message = simplified["error"].get("message", error_message) + else: + error_message = str(simplified["error"]) + return error_message + + # For a resource, we want to directly return just the disassembly text + result = simplified["result"] + + # Check if we have a disassembly_text field already + if isinstance(result, dict) and "disassembly_text" in result: + return result["disassembly_text"] + + # Otherwise if we have raw instructions, format them ourselves + if isinstance(result, dict) and "instructions" in result and isinstance(result["instructions"], list): + disasm_text = "" + for instr in result["instructions"]: + if isinstance(instr, dict): + addr = instr.get("address", "") + mnemonic = instr.get("mnemonic", "") + operands = instr.get("operands", "") + bytes_str = instr.get("bytes", "") + + # Format: address: bytes mnemonic operands + disasm_text += f"{addr}: {bytes_str.ljust(10)} {mnemonic} {operands}\n" + + return disasm_text + + # If we have a direct disassembly field, try that as well + if isinstance(result, dict) and "disassembly" in result: + return result["disassembly"] + + return "Error: Could not extract disassembly from response" + +# ================= MCP Prompts ================= +# Prompts define reusable templates for LLM interactions + +@mcp.prompt("analyze_function") +def analyze_function_prompt(name: str = None, address: str = None, port: int = None): + """A prompt to guide the LLM through analyzing a function + + Args: + name: Function name (mutually exclusive with address) + address: Function address in hex format (mutually exclusive with address) + port: Specific Ghidra instance port (optional) + """ + port = _get_instance_port(port) + + # Get function name if only address is provided + if address and not name: + fn_info = function_info(address=address, port=port) + if isinstance(fn_info, dict) and "name" in fn_info: + name = fn_info["name"] + + # Create the template that guides analysis + return { + "prompt": f""" + Analyze the following function: {name or address} + + Decompiled code: + ```c + {decompiled_function(name=name, address=address, port=port)} + ``` + + Disassembly: + ``` + {disassembly(name=name, address=address, port=port)} + ``` + + 1. What is the purpose of this function? + 2. What are the key parameters and their uses? + 3. What are the return values and their meanings? + 4. Are there any security concerns in this implementation? + 5. Describe the algorithm or process being implemented. + """, + "context": { + "function_info": function_info(name=name, address=address, port=port) + } + } + +@mcp.prompt("identify_vulnerabilities") +def identify_vulnerabilities_prompt(name: str = None, address: str = None, port: int = None): + """A prompt to help identify potential vulnerabilities in a function + + Args: + name: Function name (mutually exclusive with address) + address: Function address in hex format (mutually exclusive with address) + port: Specific Ghidra instance port (optional) + """ + port = _get_instance_port(port) + + # Get function name if only address is provided + if address and not name: + fn_info = function_info(address=address, port=port) + if isinstance(fn_info, dict) and "name" in fn_info: + name = fn_info["name"] + + # Create the template focused on security analysis + return { + "prompt": f""" + Analyze the following function for security vulnerabilities: {name or address} + + Decompiled code: + ```c + {decompiled_function(name=name, address=address, port=port)} + ``` + + Look for these vulnerability types: + 1. Buffer overflows or underflows + 2. Integer overflow/underflow + 3. Use-after-free or double-free bugs + 4. Format string vulnerabilities + 5. Missing bounds checks + 6. Insecure memory operations + 7. Race conditions or timing issues + 8. Input validation problems + + For each potential vulnerability: + - Describe the vulnerability and where it occurs + - Explain the security impact + - Suggest how it could be exploited + - Recommend a fix + """, + "context": { + "function_info": function_info(name=name, address=address, port=port), + "disassembly": disassembly(name=name, address=address, port=port) + } + } + +# ================= MCP Tool Groups ================= +# Tools are organized into logical domains + +@mcp.tool_group("instances") +class InstanceTools: + """Tools for managing Ghidra instances""" + + @mcp.tool() + def list() -> dict: + """List all active Ghidra instances""" + with instances_lock: + return { + "instances": [ + { + "port": port, + "url": info["url"], + "project": info.get("project", ""), + "file": info.get("file", "") + } + for port, info in active_instances.items() + ] + } + + @mcp.tool() + def discover(host: str = None) -> dict: + """Discover available Ghidra instances by scanning ports + + Args: + host: Optional host to scan (default: configured ghidra_host) + + Returns: + dict: Contains 'found' count and 'instances' list with discovery results + """ + return _discover_instances(QUICK_DISCOVERY_RANGE, host=host, timeout=0.5) + + @mcp.tool() + def register(port: int, url: str = None) -> str: + """Register a new Ghidra instance + + Args: + port: Port number of the Ghidra instance + url: Optional URL if different from default http://host:port + + Returns: + str: Confirmation message or error + """ + return register_instance(port, url) + + @mcp.tool() + def unregister(port: int) -> str: + """Unregister a Ghidra instance + + Args: + port: Port number of the instance to unregister + + Returns: + str: Confirmation message or error + """ + with instances_lock: + if port in active_instances: + del active_instances[port] + return f"Unregistered instance on port {port}" + return f"No instance found on port {port}" + + @mcp.tool() + def use(port: int) -> str: + """Set the current working Ghidra instance + + Args: + port: Port number of the instance to use + + Returns: + str: Confirmation message or error + """ + global current_instance_port + + # First validate that the instance exists and is active + if port not in active_instances: + # Try to register it if not found + register_instance(port) + if port not in active_instances: + return f"Error: No active Ghidra instance found on port {port}" + + # Set as current instance + current_instance_port = port + + # Return information about the selected instance + with instances_lock: + info = active_instances[port] + program = info.get("file", "unknown program") + project = info.get("project", "unknown project") + return f"Now using Ghidra instance on port {port} with {program} in project {project}" + + @mcp.tool() + def current() -> dict: + """Get information about the current working Ghidra instance + + Returns: + dict: Details about the current instance and program + """ + return ghidra_instance(port=current_instance_port) + +@mcp.tool_group("functions") +class FunctionTools: + """Tools for working with functions""" + + @mcp.tool() + def list(offset: int = 0, limit: int = 100, + name_contains: str = None, + name_matches_regex: str = None, + port: int = None) -> dict: + """List functions with filtering and pagination + + Args: + offset: Pagination offset (default: 0) + limit: Maximum items to return (default: 100) + name_contains: Substring name filter (case-insensitive) + name_matches_regex: Regex name filter + port: Specific Ghidra instance port (optional) + + Returns: + dict: List of functions with pagination information + """ + port = _get_instance_port(port) + + params = { + "offset": offset, + "limit": limit + } + if name_contains: + params["name_contains"] = name_contains + if name_matches_regex: + params["name_matches_regex"] = name_matches_regex + + response = safe_get(port, "functions", params) + simplified = simplify_response(response) + + # Ensure we maintain pagination metadata + if isinstance(simplified, dict) and "error" not in simplified: + simplified.setdefault("size", len(simplified.get("result", []))) + simplified.setdefault("offset", offset) + simplified.setdefault("limit", limit) + + return simplified + + @mcp.tool() + def get(name: str = None, address: str = None, port: int = None) -> dict: + """Get detailed information about a function + + Args: + name: Function name (mutually exclusive with address) + address: Function address in hex format (mutually exclusive with name) + port: Specific Ghidra instance port (optional) + + Returns: + dict: Detailed function information + """ + if not name and not address: + return { + "success": False, + "error": { + "code": "MISSING_PARAMETER", + "message": "Either name or address parameter is required" + }, + "timestamp": int(time.time() * 1000) + } + + port = _get_instance_port(port) + + if address: + endpoint = f"functions/{address}" + else: + endpoint = f"functions/by-name/{quote(name)}" + + response = safe_get(port, endpoint) + return simplify_response(response) + + @mcp.tool() + def decompile(name: str = None, address: str = None, + syntax_tree: bool = False, style: str = "normalize", + port: int = None) -> dict: + """Get decompiled code for a function + + Args: + name: Function name (mutually exclusive with address) + address: Function address in hex format (mutually exclusive with name) + syntax_tree: Include syntax tree (default: False) + style: Decompiler style (default: "normalize") + port: Specific Ghidra instance port (optional) + + Returns: + dict: Contains function information and decompiled code + """ + if not name and not address: + return { + "success": False, + "error": { + "code": "MISSING_PARAMETER", + "message": "Either name or address parameter is required" + }, + "timestamp": int(time.time() * 1000) + } + + port = _get_instance_port(port) + + params = { + "syntax_tree": str(syntax_tree).lower(), + "style": style + } + + if address: + endpoint = f"functions/{address}/decompile" + else: + endpoint = f"functions/by-name/{quote(name)}/decompile" + + response = safe_get(port, endpoint, params) + simplified = simplify_response(response) + + # For AI consumption, make the decompiled code more directly accessible + if "result" in simplified and isinstance(simplified["result"], dict): + if "decompiled" in simplified["result"]: + simplified["decompiled_code"] = simplified["result"]["decompiled"] + elif "ccode" in simplified["result"]: + simplified["decompiled_code"] = simplified["result"]["ccode"] + elif "decompiled_text" in simplified["result"]: + simplified["decompiled_code"] = simplified["result"]["decompiled_text"] + + return simplified + + @mcp.tool() + def disassemble(name: str = None, address: str = None, port: int = None) -> dict: + """Get disassembly for a function + + Args: + name: Function name (mutually exclusive with address) + address: Function address in hex format (mutually exclusive with name) + port: Specific Ghidra instance port (optional) + + Returns: + dict: Contains function information and disassembly text + """ + if not name and not address: + return { + "success": False, + "error": { + "code": "MISSING_PARAMETER", + "message": "Either name or address parameter is required" + }, + "timestamp": int(time.time() * 1000) + } + + port = _get_instance_port(port) + + if address: + endpoint = f"functions/{address}/disassembly" + else: + endpoint = f"functions/by-name/{quote(name)}/disassembly" + + response = safe_get(port, endpoint) + return simplify_response(response) + + @mcp.tool() + def create(address: str, port: int = None) -> dict: + """Create a new function at the specified address + + Args: + address: Memory address in hex format where function starts + port: Specific Ghidra instance port (optional) + + Returns: + dict: Operation result with the created function information + """ + if not address: + return { + "success": False, + "error": { + "code": "MISSING_PARAMETER", + "message": "Address parameter is required" + }, + "timestamp": int(time.time() * 1000) + } + + port = _get_instance_port(port) + + payload = { + "address": address + } + + response = safe_post(port, "functions", payload) + return simplify_response(response) + + @mcp.tool() + def rename(old_name: str = None, address: str = None, new_name: str = "", port: int = None) -> dict: + """Rename a function + + Args: + old_name: Current function name (mutually exclusive with address) + address: Function address in hex format (mutually exclusive with name) + new_name: New function name + port: Specific Ghidra instance port (optional) + + Returns: + dict: Operation result with the updated function information + """ + if not (old_name or address) or not new_name: + return { + "success": False, + "error": { + "code": "MISSING_PARAMETER", + "message": "Either old_name or address, and new_name parameters are required" + }, + "timestamp": int(time.time() * 1000) + } + + port = _get_instance_port(port) + + payload = { + "name": new_name + } + + if address: + endpoint = f"functions/{address}" + else: + endpoint = f"functions/by-name/{quote(old_name)}" + + response = safe_patch(port, endpoint, payload) + return simplify_response(response) + + @mcp.tool() + def set_signature(name: str = None, address: str = None, signature: str = "", port: int = None) -> dict: + """Set function signature/prototype + + Args: + name: Function name (mutually exclusive with address) + address: Function address in hex format (mutually exclusive with name) + signature: New function signature (e.g., "int func(char *data, int size)") + port: Specific Ghidra instance port (optional) + + Returns: + dict: Operation result with the updated function information + """ + if not (name or address) or not signature: + return { + "success": False, + "error": { + "code": "MISSING_PARAMETER", + "message": "Either name or address, and signature parameters are required" + }, + "timestamp": int(time.time() * 1000) + } + + port = _get_instance_port(port) + + payload = { + "signature": signature + } + + if address: + endpoint = f"functions/{address}" + else: + endpoint = f"functions/by-name/{quote(name)}" + + response = safe_patch(port, endpoint, payload) + return simplify_response(response) + + @mcp.tool() + def get_variables(name: str = None, address: str = None, port: int = None) -> dict: + """Get variables for a function + + Args: + name: Function name (mutually exclusive with address) + address: Function address in hex format (mutually exclusive with name) + port: Specific Ghidra instance port (optional) + + Returns: + dict: Contains function information and list of variables + """ + if not name and not address: + return { + "success": False, + "error": { + "code": "MISSING_PARAMETER", + "message": "Either name or address parameter is required" + }, + "timestamp": int(time.time() * 1000) + } + + port = _get_instance_port(port) + + if address: + endpoint = f"functions/{address}/variables" + else: + endpoint = f"functions/by-name/{quote(name)}/variables" + + response = safe_get(port, endpoint) + return simplify_response(response) + +# Additional tool groups would be defined here for other domains: +# @mcp.tool_group("data") +# @mcp.tool_group("memory") +# @mcp.tool_group("analysis") +# @mcp.tool_group("xrefs") +# @mcp.tool_group("symbols") +# @mcp.tool_group("variables") + +# ================= Startup ================= + +if __name__ == "__main__": + register_instance(DEFAULT_GHIDRA_PORT, + f"http://{ghidra_host}:{DEFAULT_GHIDRA_PORT}") + + # Use quick discovery on startup + _discover_instances(QUICK_DISCOVERY_RANGE) + + # Start background discovery thread + discovery_thread = threading.Thread( + target=periodic_discovery, + daemon=True, + name="GhydraMCP-Discovery" + ) + discovery_thread.start() + + signal.signal(signal.SIGINT, handle_sigint) + mcp.run(transport="stdio") \ No newline at end of file