mcghidra/bridge_mcp_hydra.py
Teal Bauer 4f3042f6ee feat: Update bridge_mcp_hydra.py to work with HATEOAS API
- Remove all legacy API compatibility code
- Implement simplified response structure for AI agents
- Add text representations of structured data like disassembly
- Support addressing functions by name or address
- Improve parameter validation and error handling
- Implement all endpoints from the API specification
2025-04-14 01:28:35 +02:00

1371 lines
45 KiB
Python

# /// script
# requires-python = ">=3.11"
# dependencies = [
# "mcp==1.6.0",
# "requests==2.32.3",
# ]
# ///
# GhydraMCP Bridge for Ghidra HATEOAS API
# This script implements the MCP_BRIDGE_API.md specification
import os
import signal
import sys
import threading
import time
from threading import Lock
from typing import Dict, List, Optional, Union, Any
from urllib.parse import quote, urlencode
from urllib.parse import urlparse
import requests
from mcp.server.fastmcp import FastMCP
# Allowed origins for CORS/CSRF protection
ALLOWED_ORIGINS = os.environ.get(
"GHIDRA_ALLOWED_ORIGINS", "http://localhost").split(",")
# Track active Ghidra instances (port -> info dict)
active_instances: Dict[int, dict] = {}
instances_lock = Lock()
DEFAULT_GHIDRA_PORT = 8192
DEFAULT_GHIDRA_HOST = "localhost"
# Port ranges for scanning
QUICK_DISCOVERY_RANGE = range(DEFAULT_GHIDRA_PORT, DEFAULT_GHIDRA_PORT+10)
FULL_DISCOVERY_RANGE = range(DEFAULT_GHIDRA_PORT, DEFAULT_GHIDRA_PORT+20)
instructions = """
GhydraMCP allows interacting with multiple Ghidra SRE instances. Ghidra SRE is a tool for reverse engineering and analyzing binaries, e.g. malware.
First, run `discover_instances` to find open Ghidra instances. List tools to see what GhydraMCP can do.
"""
mcp = FastMCP("GhydraMCP", instructions=instructions)
ghidra_host = os.environ.get("GHIDRA_HYDRA_HOST", DEFAULT_GHIDRA_HOST)
def get_instance_url(port: int) -> str:
"""Get URL for a Ghidra instance by port"""
with instances_lock:
if port in active_instances:
return active_instances[port]["url"]
if 8192 <= port <= 65535:
register_instance(port)
if port in active_instances:
return active_instances[port]["url"]
return f"http://{ghidra_host}:{port}"
def validate_origin(headers: dict) -> bool:
"""Validate request origin against allowed origins"""
origin = headers.get("Origin")
if not origin:
# No origin header - allow (browser same-origin policy applies)
return True
# Parse origin to get scheme+hostname
try:
parsed = urlparse(origin)
origin_base = f"{parsed.scheme}://{parsed.hostname}"
if parsed.port:
origin_base += f":{parsed.port}"
except:
return False
return origin_base in ALLOWED_ORIGINS
def _make_request(method: str, port: int, endpoint: str, params: dict = None,
json_data: dict = None, data: str = None,
headers: dict = None) -> dict:
"""Internal helper to make HTTP requests and handle common errors."""
url = f"{get_instance_url(port)}/{endpoint}"
request_headers = {'Accept': 'application/json'}
if headers:
request_headers.update(headers)
is_state_changing = method.upper() in ["POST", "PUT", "PATCH", "DELETE"]
if is_state_changing:
check_headers = json_data.get("headers", {}) if isinstance(
json_data, dict) else (headers or {})
if not validate_origin(check_headers):
return {
"success": False,
"error": "Origin not allowed",
"status_code": 403,
"timestamp": int(time.time() * 1000)
}
if json_data is not None:
request_headers['Content-Type'] = 'application/json'
elif data is not None:
request_headers['Content-Type'] = 'text/plain'
try:
response = requests.request(
method,
url,
params=params,
json=json_data,
data=data,
headers=request_headers,
timeout=10
)
try:
parsed_json = response.json()
# Add timestamp if not present
if isinstance(parsed_json, dict) and "timestamp" not in parsed_json:
parsed_json["timestamp"] = int(time.time() * 1000)
return parsed_json
except ValueError:
if response.ok:
return {
"success": False,
"error": "Received non-JSON success response from Ghidra plugin",
"status_code": response.status_code,
"response_text": response.text[:500],
"timestamp": int(time.time() * 1000)
}
else:
return {
"success": False,
"error": f"HTTP {response.status_code} - Non-JSON error response",
"status_code": response.status_code,
"response_text": response.text[:500],
"timestamp": int(time.time() * 1000)
}
except requests.exceptions.Timeout:
return {
"success": False,
"error": "Request timed out",
"status_code": 408,
"timestamp": int(time.time() * 1000)
}
except requests.exceptions.ConnectionError:
return {
"success": False,
"error": f"Failed to connect to Ghidra instance at {url}",
"status_code": 503,
"timestamp": int(time.time() * 1000)
}
except Exception as e:
return {
"success": False,
"error": f"An unexpected error occurred: {str(e)}",
"exception": e.__class__.__name__,
"timestamp": int(time.time() * 1000)
}
def safe_get(port: int, endpoint: str, params: dict = None) -> dict:
"""Make GET request to Ghidra instance"""
return _make_request("GET", port, endpoint, params=params)
def safe_put(port: int, endpoint: str, data: dict) -> dict:
"""Make PUT request to Ghidra instance with JSON payload"""
headers = data.pop("headers", None) if isinstance(data, dict) else None
return _make_request("PUT", port, endpoint, json_data=data, headers=headers)
def safe_post(port: int, endpoint: str, data: Union[dict, str]) -> dict:
"""Perform a POST request to a specific Ghidra instance with JSON or text payload"""
headers = None
json_payload = None
text_payload = None
if isinstance(data, dict):
headers = data.pop("headers", None)
json_payload = data
else:
text_payload = data
return _make_request("POST", port, endpoint, json_data=json_payload, data=text_payload, headers=headers)
def safe_patch(port: int, endpoint: str, data: dict) -> dict:
"""Perform a PATCH request to a specific Ghidra instance with JSON payload"""
headers = data.pop("headers", None) if isinstance(data, dict) else None
return _make_request("PATCH", port, endpoint, json_data=data, headers=headers)
def safe_delete(port: int, endpoint: str) -> dict:
"""Perform a DELETE request to a specific Ghidra instance"""
return _make_request("DELETE", port, endpoint)
def simplify_response(response: dict) -> dict:
"""
Simplify HATEOAS response data for easier AI agent consumption
- Removes _links from result entries
- Flattens nested structures when appropriate
- Preserves important metadata
- Converts structured data like disassembly to text for easier consumption
"""
if not isinstance(response, dict):
return response
# Make a copy to avoid modifying the original
result = response.copy()
# Simplify the main result data if present
if "result" in result:
# Handle array results
if isinstance(result["result"], list):
simplified_items = []
for item in result["result"]:
if isinstance(item, dict):
# Remove HATEOAS links from individual items
item_copy = item.copy()
item_copy.pop("_links", None)
simplified_items.append(item_copy)
else:
simplified_items.append(item)
result["result"] = simplified_items
# Handle object results
elif isinstance(result["result"], dict):
result_copy = result["result"].copy()
# Remove links from result object
result_copy.pop("_links", None)
# Special case for disassembly - convert to text for easier consumption
if "instructions" in result_copy and isinstance(result_copy["instructions"], list):
disasm_text = ""
for instr in result_copy["instructions"]:
if isinstance(instr, dict):
addr = instr.get("address", "")
mnemonic = instr.get("mnemonic", "")
operands = instr.get("operands", "")
bytes_str = instr.get("bytes", "")
# Format: address: bytes mnemonic operands
disasm_text += f"{addr}: {bytes_str.ljust(10)} {mnemonic} {operands}\n"
# Add the text representation while preserving the original structured data
result_copy["disassembly_text"] = disasm_text
# Special case for decompiled code - make sure it's directly accessible
if "ccode" in result_copy:
result_copy["decompiled_text"] = result_copy["ccode"]
elif "decompiled" in result_copy:
result_copy["decompiled_text"] = result_copy["decompiled"]
result["result"] = result_copy
# Remove HATEOAS links from the top level
result.pop("_links", None)
return result
# Instance management tools
@mcp.tool()
def list_instances() -> dict:
"""List all active Ghidra instances
Returns:
dict: Contains 'instances' list with port, url, project and file info for each instance
"""
with instances_lock:
return {
"instances": [
{
"port": port,
"url": info["url"],
"project": info.get("project", ""),
"file": info.get("file", "")
}
for port, info in active_instances.items()
]
}
@mcp.tool()
def register_instance(port: int, url: str = None) -> str:
"""Register a new Ghidra instance
Args:
port: Port number of the Ghidra instance
url: Optional URL if different from default http://host:port
Returns:
str: Confirmation message or error
"""
if url is None:
url = f"http://{ghidra_host}:{port}"
try:
# Check for HATEOAS API by checking plugin-version endpoint
test_url = f"{url}/plugin-version"
response = requests.get(test_url, timeout=2)
if not response.ok:
return f"Error: Instance at {url} is not responding properly to HATEOAS API"
project_info = {"url": url}
try:
# Get program info from HATEOAS API
info_url = f"{url}/program"
try:
info_response = requests.get(info_url, timeout=2)
if info_response.ok:
try:
info_data = info_response.json()
if "result" in info_data:
result = info_data["result"]
if isinstance(result, dict):
project_info["project"] = result.get("project", "")
project_info["file"] = result.get("name", "")
project_info["path"] = result.get("path", "")
project_info["language_id"] = result.get("language_id", "")
project_info["compiler_spec_id"] = result.get("compiler_spec_id", "")
project_info["image_base"] = result.get("image_base", "")
except Exception as e:
print(f"Error parsing info endpoint: {e}", file=sys.stderr)
except Exception as e:
print(f"Error connecting to info endpoint: {e}", file=sys.stderr)
except Exception:
# Non-critical, continue with registration even if project info fails
pass
with instances_lock:
active_instances[port] = project_info
return f"Registered instance on port {port} at {url}"
except Exception as e:
return f"Error: Could not connect to instance at {url}: {str(e)}"
@mcp.tool()
def unregister_instance(port: int) -> str:
"""Unregister a Ghidra instance
Args:
port: Port number of the instance to unregister
Returns:
str: Confirmation message or error
"""
with instances_lock:
if port in active_instances:
del active_instances[port]
return f"Unregistered instance on port {port}"
return f"No instance found on port {port}"
@mcp.tool()
def discover_instances(host: str = None) -> dict:
"""Discover available Ghidra instances by scanning ports
Args:
host: Optional host to scan (default: configured ghidra_host)
Returns:
dict: Contains 'found' count and 'instances' list with discovery results
"""
return _discover_instances(QUICK_DISCOVERY_RANGE, host=host, timeout=0.5)
def _discover_instances(port_range, host=None, timeout=0.5) -> dict:
"""Internal function to discover Ghidra instances by scanning ports"""
found_instances = []
scan_host = host if host is not None else ghidra_host
for port in port_range:
if port in active_instances:
continue
url = f"http://{scan_host}:{port}"
try:
# Try HATEOAS API via plugin-version endpoint
test_url = f"{url}/plugin-version"
response = requests.get(test_url, timeout=timeout)
if response.ok:
result = register_instance(port, url)
found_instances.append(
{"port": port, "url": url, "result": result})
except requests.exceptions.RequestException:
# Instance not available, just continue
continue
return {
"found": len(found_instances),
"instances": found_instances
}
@mcp.tool()
def get_plugin_version(port: int = DEFAULT_GHIDRA_PORT) -> dict:
"""Get version information for the Ghidra plugin
Args:
port: Ghidra instance port (default: 8192)
Returns:
dict: Plugin and API version information
"""
response = safe_get(port, "plugin-version")
return simplify_response(response)
@mcp.tool()
def get_program_info(port: int = DEFAULT_GHIDRA_PORT) -> dict:
"""Get detailed information about the current program
Args:
port: Ghidra instance port (default: 8192)
Returns:
dict: Contains metadata about the current program including name,
architecture, memory layout, compiler, etc.
"""
response = safe_get(port, "program")
return simplify_response(response)
@mcp.tool()
def list_functions(port: int = DEFAULT_GHIDRA_PORT,
offset: int = 0,
limit: int = 100,
addr: str = None,
name: str = None,
name_contains: str = None,
name_matches_regex: str = None) -> dict:
"""List functions in the current program with filtering and pagination
Args:
port: Ghidra instance port (default: 8192)
offset: Pagination offset (default: 0)
limit: Maximum items to return (default: 100)
addr: Filter by address (hexadecimal)
name: Exact name match filter (case-sensitive)
name_contains: Substring name filter (case-insensitive)
name_matches_regex: Regex name filter
Returns:
dict: {
"result": list of function info objects,
"size": total count,
"offset": current offset,
"limit": current limit
}
"""
params = {
"offset": offset,
"limit": limit
}
if addr:
params["addr"] = addr
if name:
params["name"] = name
if name_contains:
params["name_contains"] = name_contains
if name_matches_regex:
params["name_matches_regex"] = name_matches_regex
response = safe_get(port, "functions", params)
simplified = simplify_response(response)
# Ensure we maintain pagination metadata
if isinstance(simplified, dict) and "error" not in simplified:
simplified.setdefault("size", len(simplified.get("result", [])))
simplified.setdefault("offset", offset)
simplified.setdefault("limit", limit)
return simplified
@mcp.tool()
def get_function(port: int = DEFAULT_GHIDRA_PORT,
address: str = None,
name: str = None) -> dict:
"""Get details for a function by address or name
Args:
port: Ghidra instance port (default: 8192)
address: Function address in hex format (mutually exclusive with name)
name: Function name (mutually exclusive with address)
Returns:
dict: Contains function name, address, signature and other details
"""
if not address and not name:
return {
"success": False,
"error": "Either address or name parameter is required",
"timestamp": int(time.time() * 1000)
}
if address:
endpoint = f"functions/{address}"
else:
endpoint = f"functions/by-name/{quote(name)}"
response = safe_get(port, endpoint)
return simplify_response(response)
@mcp.tool()
def decompile_function(port: int = DEFAULT_GHIDRA_PORT,
address: str = None,
name: str = None,
syntax_tree: bool = False,
style: str = "normalize") -> dict:
"""Get decompiled code for a function by address or name
Args:
port: Ghidra instance port (default: 8192)
address: Function address in hex format (mutually exclusive with name)
name: Function name (mutually exclusive with address)
syntax_tree: Include syntax tree (default: False)
style: Decompiler style (default: "normalize")
Returns:
dict: Contains function information and decompiled code
"""
if not address and not name:
return {
"success": False,
"error": "Either address or name parameter is required",
"timestamp": int(time.time() * 1000)
}
params = {
"syntax_tree": str(syntax_tree).lower(),
"style": style
}
if address:
endpoint = f"functions/{address}/decompile"
else:
endpoint = f"functions/by-name/{quote(name)}/decompile"
response = safe_get(port, endpoint, params)
simplified = simplify_response(response)
# For AI consumption, make the decompiled code more directly accessible
if "result" in simplified and isinstance(simplified["result"], dict):
if "decompiled" in simplified["result"]:
simplified["decompiled_code"] = simplified["result"]["decompiled"]
elif "ccode" in simplified["result"]:
simplified["decompiled_code"] = simplified["result"]["ccode"]
elif "decompiled_text" in simplified["result"]:
simplified["decompiled_code"] = simplified["result"]["decompiled_text"]
return simplified
@mcp.tool()
def disassemble_function(port: int = DEFAULT_GHIDRA_PORT,
address: str = None,
name: str = None) -> dict:
"""Get disassembly for a function by address or name
Args:
port: Ghidra instance port (default: 8192)
address: Function address in hex format (mutually exclusive with name)
name: Function name (mutually exclusive with address)
Returns:
dict: Contains function information and disassembly text
"""
if not address and not name:
return {
"success": False,
"error": "Either address or name parameter is required",
"timestamp": int(time.time() * 1000)
}
if address:
endpoint = f"functions/{address}/disassembly"
else:
endpoint = f"functions/by-name/{quote(name)}/disassembly"
response = safe_get(port, endpoint)
simplified = simplify_response(response)
# For AI consumption, add a plain text version of the disassembly if not already present
if "result" in simplified and isinstance(simplified["result"], dict):
if "instructions" in simplified["result"] and isinstance(simplified["result"]["instructions"], list):
if "disassembly_text" not in simplified["result"]:
instr_list = simplified["result"]["instructions"]
disasm_text = ""
for instr in instr_list:
if isinstance(instr, dict):
addr = instr.get("address", "")
mnemonic = instr.get("mnemonic", "")
operands = instr.get("operands", "")
bytes_str = instr.get("bytes", "")
# Format: address: bytes mnemonic operands
disasm_text += f"{addr}: {bytes_str.ljust(10)} {mnemonic} {operands}\n"
simplified["result"]["disassembly_text"] = disasm_text
# Also make it more directly accessible
simplified["disassembly_text"] = disasm_text
return simplified
@mcp.tool()
def get_function_variables(port: int = DEFAULT_GHIDRA_PORT,
address: str = None,
name: str = None) -> dict:
"""Get variables for a function by address or name
Args:
port: Ghidra instance port (default: 8192)
address: Function address in hex format (mutually exclusive with name)
name: Function name (mutually exclusive with address)
Returns:
dict: Contains function information and list of variables
"""
if not address and not name:
return {
"success": False,
"error": "Either address or name parameter is required",
"timestamp": int(time.time() * 1000)
}
if address:
endpoint = f"functions/{address}/variables"
else:
endpoint = f"functions/by-name/{quote(name)}/variables"
response = safe_get(port, endpoint)
return simplify_response(response)
@mcp.tool()
def list_segments(port: int = DEFAULT_GHIDRA_PORT,
offset: int = 0,
limit: int = 100,
name: str = None) -> dict:
"""List memory segments with filtering and pagination
Args:
port: Ghidra instance port (default: 8192)
offset: Pagination offset (default: 0)
limit: Maximum items to return (default: 100)
name: Filter by segment name (case-sensitive substring match)
Returns:
dict: {
"result": list of segment objects with properties including name, start, end, size,
permissions (readable, writable, executable), and initialized status,
"size": total count of segments matching the filter,
"offset": current offset in pagination,
"limit": current limit for pagination
}
"""
params = {
"offset": offset,
"limit": limit
}
if name:
params["name"] = name
response = safe_get(port, "segments", params)
simplified = simplify_response(response)
# Ensure we maintain pagination metadata
if isinstance(simplified, dict) and "error" not in simplified:
simplified.setdefault("size", len(simplified.get("result", [])))
simplified.setdefault("offset", offset)
simplified.setdefault("limit", limit)
return simplified
@mcp.tool()
def list_symbols(port: int = DEFAULT_GHIDRA_PORT,
offset: int = 0,
limit: int = 100,
addr: str = None,
name: str = None,
name_contains: str = None,
type: str = None) -> dict:
"""List symbols with filtering and pagination
Args:
port: Ghidra instance port (default: 8192)
offset: Pagination offset (default: 0)
limit: Maximum items to return (default: 100)
addr: Filter by address (hexadecimal)
name: Exact name match filter (case-sensitive)
name_contains: Substring name filter (case-insensitive)
type: Filter by symbol type (e.g. "function", "data", "label")
Returns:
dict: {
"result": list of symbol objects,
"size": total count,
"offset": current offset,
"limit": current limit
}
"""
params = {
"offset": offset,
"limit": limit
}
if addr:
params["addr"] = addr
if name:
params["name"] = name
if name_contains:
params["name_contains"] = name_contains
if type:
params["type"] = type
response = safe_get(port, "symbols", params)
simplified = simplify_response(response)
# Ensure we maintain pagination metadata
if isinstance(simplified, dict) and "error" not in simplified:
simplified.setdefault("size", len(simplified.get("result", [])))
simplified.setdefault("offset", offset)
simplified.setdefault("limit", limit)
return simplified
@mcp.tool()
def list_data_items(port: int = DEFAULT_GHIDRA_PORT,
offset: int = 0,
limit: int = 100,
addr: str = None,
name: str = None,
name_contains: str = None,
type: str = None) -> dict:
"""List defined data items with filtering and pagination
Args:
port: Ghidra instance port (default: 8192)
offset: Pagination offset (default: 0)
limit: Maximum items to return (default: 100)
addr: Filter by address (hexadecimal)
name: Exact name match filter (case-sensitive)
name_contains: Substring name filter (case-insensitive)
type: Filter by data type (e.g. "string", "dword")
Returns:
dict: {
"result": list of data item objects,
"size": total count,
"offset": current offset,
"limit": current limit
}
"""
params = {
"offset": offset,
"limit": limit
}
if addr:
params["addr"] = addr
if name:
params["name"] = name
if name_contains:
params["name_contains"] = name_contains
if type:
params["type"] = type
response = safe_get(port, "data", params)
simplified = simplify_response(response)
# Ensure we maintain pagination metadata
if isinstance(simplified, dict) and "error" not in simplified:
simplified.setdefault("size", len(simplified.get("result", [])))
simplified.setdefault("offset", offset)
simplified.setdefault("limit", limit)
return simplified
@mcp.tool()
def read_memory(port: int = DEFAULT_GHIDRA_PORT,
address: str = "",
length: int = 16,
format: str = "hex") -> dict:
"""Read bytes from memory
Args:
port: Ghidra instance port (default: 8192)
address: Memory address in hex format
length: Number of bytes to read (default: 16)
format: Output format - "hex", "base64", or "string" (default: "hex")
Returns:
dict: {
"address": original address,
"length": bytes read,
"format": output format,
"bytes": the memory contents as a string in the specified format,
"timestamp": response timestamp
}
"""
if not address:
return {
"success": False,
"error": "Address parameter is required",
"timestamp": int(time.time() * 1000)
}
params = {
"length": length,
"format": format
}
response = safe_get(port, f"memory/{address}", params)
simplified = simplify_response(response)
# Ensure the result is simple and directly usable
if "result" in simplified and isinstance(simplified["result"], dict):
bytes_data = simplified["result"].get("bytes", "")
memory_info = {
"address": address,
"length": length,
"format": format,
"bytes": bytes_data,
"timestamp": simplified.get("timestamp", int(time.time() * 1000))
}
return memory_info
return simplified
@mcp.tool()
def write_memory(port: int = DEFAULT_GHIDRA_PORT,
address: str = "",
bytes_data: str = "",
format: str = "hex") -> dict:
"""Write bytes to memory (use with caution)
Args:
port: Ghidra instance port (default: 8192)
address: Memory address in hex format
bytes_data: Data to write (format depends on 'format' parameter)
format: Input format - "hex", "base64", or "string" (default: "hex")
Returns:
dict: Operation result with success status containing:
- address: the target memory address
- length: number of bytes written
- bytesWritten: confirmation of bytes written
"""
if not address or not bytes_data:
return {
"success": False,
"error": "Address and bytes parameters are required",
"timestamp": int(time.time() * 1000)
}
payload = {
"bytes": bytes_data,
"format": format
}
response = safe_patch(port, f"memory/{address}", payload)
return simplify_response(response)
@mcp.tool()
def list_xrefs(port: int = DEFAULT_GHIDRA_PORT,
to_addr: str = None,
from_addr: str = None,
type: str = None,
offset: int = 0,
limit: int = 100) -> dict:
"""List cross-references with filtering and pagination
Args:
port: Ghidra instance port (default: 8192)
to_addr: Filter references to this address (hexadecimal)
from_addr: Filter references from this address (hexadecimal)
type: Filter by reference type (e.g. "CALL", "READ", "WRITE")
offset: Pagination offset (default: 0)
limit: Maximum items to return (default: 100)
Returns:
dict: {
"result": list of xref objects with from_addr, to_addr, type, from_function, to_function fields,
"size": total number of xrefs matching the filter,
"offset": current offset for pagination,
"limit": current limit for pagination
}
"""
params = {
"offset": offset,
"limit": limit
}
if to_addr:
params["to_addr"] = to_addr
if from_addr:
params["from_addr"] = from_addr
if type:
params["type"] = type
response = safe_get(port, "xrefs", params)
simplified = simplify_response(response)
# Ensure we maintain pagination metadata
if isinstance(simplified, dict) and "error" not in simplified:
simplified.setdefault("size", len(simplified.get("result", [])))
simplified.setdefault("offset", offset)
simplified.setdefault("limit", limit)
# For AI consumption, make the references more directly accessible
if "result" in simplified and isinstance(simplified["result"], dict) and "references" in simplified["result"]:
simplified["xrefs"] = simplified["result"]["references"]
return simplified
@mcp.tool()
def get_current_address(port: int = DEFAULT_GHIDRA_PORT) -> dict:
"""Get the address currently selected in Ghidra's UI
Args:
port: Ghidra instance port (default: 8192)
Returns:
Dict containing:
- success: boolean indicating success
- result: object with address field
- error: error message if failed
- timestamp: timestamp of response
"""
response = safe_get(port, "address")
return simplify_response(response)
@mcp.tool()
def get_current_function(port: int = DEFAULT_GHIDRA_PORT) -> dict:
"""Get the function currently selected in Ghidra's UI
Args:
port: Ghidra instance port (default: 8192)
Returns:
Dict containing:
- success: boolean indicating success
- result: object with name, address and signature fields
- error: error message if failed
- timestamp: timestamp of response
"""
response = safe_get(port, "function")
return simplify_response(response)
@mcp.tool()
def analyze_program(port: int = DEFAULT_GHIDRA_PORT,
analysis_options: dict = None) -> dict:
"""Run analysis on the current program
Args:
port: Ghidra instance port (default: 8192)
analysis_options: Dictionary of analysis options to enable/disable
(e.g. {"functionRecovery": True, "dataRefs": False})
None means use default analysis options
Returns:
dict: Analysis operation result with status containing:
- program: program name
- analysis_triggered: boolean indicating if analysis was successfully started
- message: status message
"""
response = safe_post(port, "analysis", analysis_options or {})
return simplify_response(response)
@mcp.tool()
def create_function(port: int = DEFAULT_GHIDRA_PORT,
address: str = "") -> dict:
"""Create a new function at the specified address
Args:
port: Ghidra instance port (default: 8192)
address: Memory address in hex format where function starts
Returns:
dict: Operation result with the created function information
"""
if not address:
return {
"success": False,
"error": "Address parameter is required",
"timestamp": int(time.time() * 1000)
}
payload = {
"address": address
}
response = safe_post(port, "functions", payload)
return simplify_response(response)
@mcp.tool()
def rename_function(port: int = DEFAULT_GHIDRA_PORT,
address: str = None,
name: str = None,
new_name: str = "") -> dict:
"""Rename a function
Args:
port: Ghidra instance port (default: 8192)
address: Function address in hex format (mutually exclusive with name)
name: Current function name (mutually exclusive with address)
new_name: New function name
Returns:
dict: Operation result with the updated function information
"""
if not (address or name) or not new_name:
return {
"success": False,
"error": "Either address or name, and new_name parameters are required",
"timestamp": int(time.time() * 1000)
}
payload = {
"name": new_name
}
if address:
endpoint = f"functions/{address}"
else:
endpoint = f"functions/by-name/{quote(name)}"
response = safe_patch(port, endpoint, payload)
return simplify_response(response)
@mcp.tool()
def set_function_signature(port: int = DEFAULT_GHIDRA_PORT,
address: str = None,
name: str = None,
signature: str = "") -> dict:
"""Set function signature/prototype
Args:
port: Ghidra instance port (default: 8192)
address: Function address in hex format (mutually exclusive with name)
name: Function name (mutually exclusive with address)
signature: New function signature (e.g., "int func(char *data, int size)")
Returns:
dict: Operation result with the updated function information
"""
if not (address or name) or not signature:
return {
"success": False,
"error": "Either address or name, and signature parameters are required",
"timestamp": int(time.time() * 1000)
}
payload = {
"signature": signature
}
if address:
endpoint = f"functions/{address}"
else:
endpoint = f"functions/by-name/{quote(name)}"
response = safe_patch(port, endpoint, payload)
return simplify_response(response)
@mcp.tool()
def rename_variable(port: int = DEFAULT_GHIDRA_PORT,
function_address: str = None,
function_name: str = None,
variable_name: str = "",
new_name: str = "") -> dict:
"""Rename a variable in a function
Args:
port: Ghidra instance port (default: 8192)
function_address: Function address in hex format (mutually exclusive with function_name)
function_name: Function name (mutually exclusive with function_address)
variable_name: Current variable name
new_name: New variable name
Returns:
dict: Operation result with the updated variable information
"""
if not (function_address or function_name) or not variable_name or not new_name:
return {
"success": False,
"error": "Function identifier (address or name), variable_name, and new_name parameters are required",
"timestamp": int(time.time() * 1000)
}
payload = {
"name": new_name
}
if function_address:
endpoint = f"functions/{function_address}/variables/{variable_name}"
else:
endpoint = f"functions/by-name/{quote(function_name)}/variables/{variable_name}"
response = safe_patch(port, endpoint, payload)
return simplify_response(response)
@mcp.tool()
def set_variable_type(port: int = DEFAULT_GHIDRA_PORT,
function_address: str = None,
function_name: str = None,
variable_name: str = "",
data_type: str = "") -> dict:
"""Change the data type of a variable in a function
Args:
port: Ghidra instance port (default: 8192)
function_address: Function address in hex format (mutually exclusive with function_name)
function_name: Function name (mutually exclusive with function_address)
variable_name: Variable name
data_type: New data type (e.g. "int", "char *")
Returns:
dict: Operation result with the updated variable information
"""
if not (function_address or function_name) or not variable_name or not data_type:
return {
"success": False,
"error": "Function identifier (address or name), variable_name, and data_type parameters are required",
"timestamp": int(time.time() * 1000)
}
payload = {
"data_type": data_type
}
if function_address:
endpoint = f"functions/{function_address}/variables/{variable_name}"
else:
endpoint = f"functions/by-name/{quote(function_name)}/variables/{variable_name}"
response = safe_patch(port, endpoint, payload)
return simplify_response(response)
@mcp.tool()
def create_data(port: int = DEFAULT_GHIDRA_PORT,
address: str = "",
data_type: str = "",
size: int = None) -> dict:
"""Define a new data item at the specified address
Args:
port: Ghidra instance port (default: 8192)
address: Memory address in hex format
data_type: Data type (e.g. "string", "dword", "byte")
size: Optional size in bytes for the data item
Returns:
dict: Operation result with the created data information
"""
if not address or not data_type:
return {
"success": False,
"error": "Address and data_type parameters are required",
"timestamp": int(time.time() * 1000)
}
payload = {
"address": address,
"type": data_type
}
if size is not None:
payload["size"] = size
response = safe_post(port, "data", payload)
return simplify_response(response)
@mcp.tool()
def list_namespaces(port: int = DEFAULT_GHIDRA_PORT,
offset: int = 0,
limit: int = 100) -> dict:
"""List namespaces with pagination
Args:
port: Ghidra instance port (default: 8192)
offset: Pagination offset (default: 0)
limit: Maximum items to return (default: 100)
Returns:
dict: Contains list of namespaces with pagination information
"""
params = {
"offset": offset,
"limit": limit
}
response = safe_get(port, "namespaces", params)
simplified = simplify_response(response)
# Ensure we maintain pagination metadata
if isinstance(simplified, dict) and "error" not in simplified:
simplified.setdefault("size", len(simplified.get("result", [])))
simplified.setdefault("offset", offset)
simplified.setdefault("limit", limit)
return simplified
@mcp.tool()
def get_callgraph(port: int = DEFAULT_GHIDRA_PORT,
function: str = None,
max_depth: int = 3) -> dict:
"""Get function call graph visualization data
Args:
port: Ghidra instance port (default: 8192)
function: Starting function name or address (None starts from entry point)
max_depth: Maximum call depth to analyze (default: 3)
Returns:
dict: Graph data with:
- root: name of the starting function
- root_address: address of the starting function
- max_depth: depth limit used for graph generation
- nodes: list of function nodes in the graph (with id, name, address)
- edges: list of call relationships between functions
"""
params = {"max_depth": max_depth}
if function:
params["function"] = function
response = safe_get(port, "analysis/callgraph", params)
return simplify_response(response)
@mcp.tool()
def get_dataflow(port: int = DEFAULT_GHIDRA_PORT,
address: str = "",
direction: str = "forward",
max_steps: int = 50) -> dict:
"""Perform data flow analysis from an address
Args:
port: Ghidra instance port (default: 8192)
address: Starting address in hex format
direction: "forward" or "backward" (default: "forward")
max_steps: Maximum analysis steps (default: 50)
Returns:
dict: Data flow analysis results
"""
if not address:
return {
"success": False,
"error": "Address parameter is required",
"timestamp": int(time.time() * 1000)
}
params = {
"address": address,
"direction": direction,
"max_steps": max_steps
}
response = safe_get(port, "analysis/dataflow", params)
return simplify_response(response)
@mcp.tool()
def set_comment(port: int = DEFAULT_GHIDRA_PORT,
address: str = "",
comment: str = "",
comment_type: str = "plate") -> dict:
"""Set a comment at the specified address
Args:
port: Ghidra instance port (default: 8192)
address: Memory address in hex format
comment: Comment text
comment_type: Type of comment -
"plate" (disassembly),
"pre" (pre-function),
"post" (post-function),
"eol" (end of line),
"repeatable" (shows each time referenced)
(default: "plate")
Returns:
dict: Operation result
"""
if not address:
return {
"success": False,
"error": "Address parameter is required",
"timestamp": int(time.time() * 1000)
}
payload = {
"comment": comment
}
response = safe_post(port, f"memory/{address}/comments/{comment_type}", payload)
return simplify_response(response)
def handle_sigint(signum, frame):
os._exit(0)
def periodic_discovery():
"""Periodically discover new instances"""
while True:
try:
_discover_instances(FULL_DISCOVERY_RANGE, timeout=0.5)
with instances_lock:
ports_to_remove = []
for port, info in active_instances.items():
url = info["url"]
try:
# Check HATEOAS API via plugin-version endpoint
response = requests.get(f"{url}/plugin-version", timeout=1)
if not response.ok:
ports_to_remove.append(port)
except requests.exceptions.RequestException:
ports_to_remove.append(port)
for port in ports_to_remove:
del active_instances[port]
print(f"Removed unreachable instance on port {port}")
except Exception as e:
print(f"Error in periodic discovery: {e}")
time.sleep(30)
if __name__ == "__main__":
register_instance(DEFAULT_GHIDRA_PORT,
f"http://{ghidra_host}:{DEFAULT_GHIDRA_PORT}")
discover_instances()
discovery_thread = threading.Thread(
target=periodic_discovery,
daemon=True,
name="GhydraMCP-Discovery"
)
discovery_thread.start()
signal.signal(signal.SIGINT, handle_sigint)
mcp.run(transport="stdio")