mcghidra/bridge_mcp_hydra.py
2025-04-09 23:36:05 +02:00

1154 lines
38 KiB
Python

# /// script
# requires-python = ">=3.11"
# dependencies = [
# "mcp==1.6.0",
# "requests==2.32.3",
# ]
# ///
import os
import signal
import sys
import threading
import time
from threading import Lock
from typing import Dict, List
from urllib.parse import quote
from urllib.parse import urlparse
import requests
from mcp.server.fastmcp import FastMCP
# Allowed origins for CORS/CSRF protection
ALLOWED_ORIGINS = os.environ.get("GHIDRA_ALLOWED_ORIGINS", "http://localhost").split(",")
# Track active Ghidra instances (port -> info dict)
active_instances: Dict[int, dict] = {}
instances_lock = Lock()
DEFAULT_GHIDRA_PORT = 8192
DEFAULT_GHIDRA_HOST = "localhost"
# Port ranges for scanning
QUICK_DISCOVERY_RANGE = range(DEFAULT_GHIDRA_PORT, DEFAULT_GHIDRA_PORT+10)
FULL_DISCOVERY_RANGE = range(DEFAULT_GHIDRA_PORT, DEFAULT_GHIDRA_PORT+20)
instructions = """
GhydraMCP allows interacting with multiple Ghidra SRE instances. Ghidra SRE is a tool for reverse engineering and analyzing binaries, e.g. malware.
First, run `discover_instances` to find open Ghidra instances. List tools to see what GhydraMCP can do.
"""
mcp = FastMCP("GhydraMCP", instructions=instructions)
ghidra_host = os.environ.get("GHIDRA_HYDRA_HOST", DEFAULT_GHIDRA_HOST)
def get_instance_url(port: int) -> str:
"""Get URL for a Ghidra instance by port"""
with instances_lock:
if port in active_instances:
return active_instances[port]["url"]
if 8192 <= port <= 65535:
register_instance(port)
if port in active_instances:
return active_instances[port]["url"]
return f"http://{ghidra_host}:{port}"
def validate_origin(headers: dict) -> bool:
"""Validate request origin against allowed origins"""
origin = headers.get("Origin")
if not origin:
return True # No origin header - allow (browser same-origin policy applies)
# Parse origin to get scheme+hostname
try:
parsed = urlparse(origin)
origin_base = f"{parsed.scheme}://{parsed.hostname}"
if parsed.port:
origin_base += f":{parsed.port}"
except:
return False
return origin_base in ALLOWED_ORIGINS
def _make_request(method: str, port: int, endpoint: str, params: dict = None, json_data: dict = None, data: str = None, headers: dict = None) -> dict:
"""Internal helper to make HTTP requests and handle common errors."""
url = f"{get_instance_url(port)}/{endpoint}"
request_headers = {'Accept': 'application/json'}
if headers:
request_headers.update(headers)
is_state_changing = method.upper() in ["POST", "PUT", "PATCH", "DELETE"]
if is_state_changing:
check_headers = json_data.get("headers", {}) if isinstance(json_data, dict) else (headers or {})
if not validate_origin(check_headers):
return {
"success": False,
"error": "Origin not allowed",
"status_code": 403,
"timestamp": int(time.time() * 1000)
}
if json_data is not None:
request_headers['Content-Type'] = 'application/json'
elif data is not None:
request_headers['Content-Type'] = 'text/plain'
try:
response = requests.request(
method,
url,
params=params,
json=json_data,
data=data,
headers=request_headers,
timeout=10
)
try:
parsed_json = response.json()
if isinstance(parsed_json, dict) and "timestamp" not in parsed_json:
parsed_json["timestamp"] = int(time.time() * 1000)
return parsed_json
except ValueError:
if response.ok:
return {
"success": False,
"error": "Received non-JSON success response from Ghidra plugin",
"status_code": response.status_code,
"response_text": response.text[:500],
"timestamp": int(time.time() * 1000)
}
else:
return {
"success": False,
"error": f"HTTP {response.status_code} - Non-JSON error response",
"status_code": response.status_code,
"response_text": response.text[:500],
"timestamp": int(time.time() * 1000)
}
except requests.exceptions.Timeout:
return {
"success": False,
"error": "Request timed out",
"status_code": 408,
"timestamp": int(time.time() * 1000)
}
except requests.exceptions.ConnectionError:
return {
"success": False,
"error": f"Failed to connect to Ghidra instance at {url}",
"status_code": 503,
"timestamp": int(time.time() * 1000)
}
except Exception as e:
return {
"success": False,
"error": f"An unexpected error occurred: {str(e)}",
"exception": e.__class__.__name__,
"timestamp": int(time.time() * 1000)
}
def safe_get(port: int, endpoint: str, params: dict = None) -> dict:
"""Make GET request to Ghidra instance"""
return _make_request("GET", port, endpoint, params=params)
def safe_put(port: int, endpoint: str, data: dict) -> dict:
"""Make PUT request to Ghidra instance with JSON payload"""
headers = data.pop("headers", None) if isinstance(data, dict) else None
return _make_request("PUT", port, endpoint, json_data=data, headers=headers)
def safe_post(port: int, endpoint: str, data: dict | str) -> dict:
"""Perform a POST request to a specific Ghidra instance with JSON or text payload"""
headers = None
json_payload = None
text_payload = None
if isinstance(data, dict):
headers = data.pop("headers", None)
json_payload = data
else:
text_payload = data
return _make_request("POST", port, endpoint, json_data=json_payload, data=text_payload, headers=headers)
# Instance management tools
@mcp.tool()
def list_instances() -> dict:
"""List all active Ghidra instances
Returns:
dict: Contains 'instances' list with port, url, project and file info for each instance
"""
with instances_lock:
return {
"instances": [
{
"port": port,
"url": info["url"],
"project": info.get("project", ""),
"file": info.get("file", "")
}
for port, info in active_instances.items()
]
}
@mcp.tool()
def register_instance(port: int, url: str = None) -> str:
"""Register a new Ghidra instance
Args:
port: Port number of the Ghidra instance
url: Optional URL if different from default http://host:port
Returns:
str: Confirmation message or error
"""
if url is None:
url = f"http://{ghidra_host}:{port}"
try:
test_url = f"{url}/instances"
response = requests.get(test_url, timeout=2)
if not response.ok:
return f"Error: Instance at {url} is not responding properly"
project_info = {"url": url}
try:
root_url = f"{url}/"
root_response = requests.get(root_url, timeout=1.5) # Short timeout for root
if root_response.ok:
try:
root_data = root_response.json()
if "project" in root_data and root_data["project"]:
project_info["project"] = root_data["project"]
if "file" in root_data and root_data["file"]:
project_info["file"] = root_data["file"]
except Exception as e:
print(f"Error parsing root info: {e}", file=sys.stderr)
if not project_info.get("project") and not project_info.get("file"):
info_url = f"{url}/info"
try:
info_response = requests.get(info_url, timeout=2)
if info_response.ok:
try:
info_data = info_response.json()
if "project" in info_data and info_data["project"]:
project_info["project"] = info_data["project"]
file_info = info_data.get("file", {})
if isinstance(file_info, dict) and file_info.get("name"):
project_info["file"] = file_info.get("name", "")
project_info["path"] = file_info.get("path", "")
project_info["architecture"] = file_info.get("architecture", "")
project_info["endian"] = file_info.get("endian", "")
print(f"Info data parsed: {project_info}", file=sys.stderr)
except Exception as e:
print(f"Error parsing info endpoint: {e}", file=sys.stderr)
except Exception as e:
print(f"Error connecting to info endpoint: {e}", file=sys.stderr)
except Exception:
# Non-critical, continue with registration even if project info fails
pass
with instances_lock:
active_instances[port] = project_info
return f"Registered instance on port {port} at {url}"
except Exception as e:
return f"Error: Could not connect to instance at {url}: {str(e)}"
@mcp.tool()
def unregister_instance(port: int) -> str:
"""Unregister a Ghidra instance
Args:
port: Port number of the instance to unregister
Returns:
str: Confirmation message or error
"""
with instances_lock:
if port in active_instances:
del active_instances[port]
return f"Unregistered instance on port {port}"
return f"No instance found on port {port}"
@mcp.tool()
def discover_instances(host: str = null) -> dict:
"""Discover available Ghidra instances by scanning ports
Args:
host: Optional host to scan (default: configured ghidra_host)
Returns:
dict: Contains 'found' count and 'instances' list with discovery results
"""
return _discover_instances(QUICK_DISCOVERY_RANGE, host=host, timeout=0.5)
def _discover_instances(port_range, host=None, timeout=0.5) -> dict:
"""Internal function to discover Ghidra instances by scanning ports"""
found_instances = []
scan_host = host if host is not None else ghidra_host
for port in port_range:
if port in active_instances:
continue
url = f"http://{scan_host}:{port}"
try:
test_url = f"{url}/instances"
response = requests.get(test_url, timeout=timeout)
if response.ok:
result = register_instance(port, url)
found_instances.append({"port": port, "url": url, "result": result})
except requests.exceptions.RequestException:
# Instance not available, just continue
continue
return {
"found": len(found_instances),
"instances": found_instances
}
@mcp.tool()
def list_functions(port: int = DEFAULT_GHIDRA_PORT,
offset: int = 0,
limit: int = 100,
addr: str = None,
name: str = None,
name_contains: str = None,
name_matches_regex: str = None) -> dict:
"""List functions in the current program with filtering and pagination
Args:
port: Ghidra instance port (default: 8192)
offset: Pagination offset (default: 0)
limit: Maximum items to return (default: 100)
addr: Filter by address (hexadecimal)
name: Exact name match filter (case-sensitive)
name_contains: Substring name filter (case-insensitive)
name_matches_regex: Regex name filter
Returns:
dict: {
"result": list of function info objects,
"size": total count,
"offset": current offset,
"limit": current limit,
"_links": pagination links
}
"""
params = {
"offset": offset,
"limit": limit
}
if addr:
params["addr"] = addr
if name:
params["name"] = name
if name_contains:
params["name_contains"] = name_contains
if name_matches_regex:
params["name_matches_regex"] = name_matches_regex
response = safe_get(port, "programs/current/functions", params)
if isinstance(response, dict) and "error" in response:
return response
# Transform to expected format if needed
return {
"result": response.get("result", []),
"size": response.get("size", len(response.get("result", []))),
"offset": offset,
"limit": limit,
"_links": response.get("_links", {})
}
@mcp.tool()
def list_classes(port: int = DEFAULT_GHIDRA_PORT, offset: int = 0, limit: int = 100) -> list:
"""List classes in the current program with pagination
Args:
port: Ghidra instance port (default: 8192)
offset: Pagination offset (default: 0)
limit: Maximum items to return (default: 100)
Returns:
list: Class names and info
"""
return safe_get(port, "classes", {"offset": offset, "limit": limit})
@mcp.tool()
def get_function(port: int = DEFAULT_GHIDRA_PORT, name: str = "", cCode: bool = True, syntaxTree: bool = False, simplificationStyle: str = "normalize") -> dict:
"""Get decompiled code for a function
Args:
port: Ghidra instance port (default: 8192)
name: Function name to decompile
cCode: Return C-style code (default: True)
syntaxTree: Include syntax tree (default: False)
simplificationStyle: Decompiler style (default: "normalize")
Returns:
dict: Contains function name, address, signature and decompilation
"""
return safe_get(port, f"functions/{quote(name)}", {
"cCode": str(cCode).lower(),
"syntaxTree": str(syntaxTree).lower(),
"simplificationStyle": simplificationStyle
})
@mcp.tool()
def update_function(port: int = DEFAULT_GHIDRA_PORT, name: str = "", new_name: str = "") -> str:
"""Rename a function
Args:
port: Ghidra instance port (default: 8192)
name: Current function name
new_name: New function name
Returns:
str: Confirmation message or error
"""
return safe_post(port, f"functions/{quote(name)}", {"newName": new_name})
@mcp.tool()
def update_data(port: int = DEFAULT_GHIDRA_PORT, address: str = "", new_name: str = "") -> str:
"""Rename data at a memory address
Args:
port: Ghidra instance port (default: 8192)
address: Memory address in hex format
new_name: New name for the data
Returns:
str: Confirmation message or error
"""
return safe_post(port, "data", {"address": address, "newName": new_name})
@mcp.tool()
def list_segments(port: int = DEFAULT_GHIDRA_PORT,
offset: int = 0,
limit: int = 100,
name: str = None) -> dict:
"""List memory segments with filtering and pagination
Args:
port: Ghidra instance port (default: 8192)
offset: Pagination offset (default: 0)
limit: Maximum items to return (default: 100)
name: Filter by segment name (case-sensitive substring match)
Returns:
dict: {
"result": list of segment objects,
"size": total count,
"offset": current offset,
"limit": current limit,
"_links": pagination links
}
"""
params = {
"offset": offset,
"limit": limit
}
if name:
params["name"] = name
response = safe_get(port, "programs/current/segments", params)
if isinstance(response, dict) and "error" in response:
return response
return {
"result": response.get("result", []),
"size": response.get("size", len(response.get("result", []))),
"offset": offset,
"limit": limit,
"_links": response.get("_links", {})
}
@mcp.tool()
def list_symbols(port: int = DEFAULT_GHIDRA_PORT,
offset: int = 0,
limit: int = 100,
addr: str = None,
name: str = None,
name_contains: str = None,
type: str = None) -> dict:
"""List symbols with filtering and pagination
Args:
port: Ghidra instance port (default: 8192)
offset: Pagination offset (default: 0)
limit: Maximum items to return (default: 100)
addr: Filter by address (hexadecimal)
name: Exact name match filter (case-sensitive)
name_contains: Substring name filter (case-insensitive)
type: Filter by symbol type (e.g. "function", "data", "label")
Returns:
dict: {
"result": list of symbol objects,
"size": total count,
"offset": current offset,
"limit": current limit,
"_links": pagination links
}
"""
params = {
"offset": offset,
"limit": limit
}
if addr:
params["addr"] = addr
if name:
params["name"] = name
if name_contains:
params["name_contains"] = name_contains
if type:
params["type"] = type
response = safe_get(port, "programs/current/symbols", params)
if isinstance(response, dict) and "error" in response:
return response
return {
"result": response.get("result", []),
"size": response.get("size", len(response.get("result", []))),
"offset": offset,
"limit": limit,
"_links": response.get("_links", {})
}
@mcp.tool()
def list_imports(port: int = DEFAULT_GHIDRA_PORT, offset: int = 0, limit: int = 100) -> dict:
"""List imported symbols with pagination
Args:
port: Ghidra instance port (default: 8192)
offset: Pagination offset (default: 0)
limit: Maximum items to return (default: 100)
Returns:
dict: {
"result": list of imported symbols,
"size": total count,
"offset": current offset,
"limit": current limit,
"_links": pagination links
}
"""
response = safe_get(port, "programs/current/symbols/imports", {"offset": offset, "limit": limit})
if isinstance(response, dict) and "error" in response:
return response
return {
"result": response.get("result", []),
"size": response.get("size", len(response.get("result", []))),
"offset": offset,
"limit": limit,
"_links": response.get("_links", {})
}
@mcp.tool()
def list_exports(port: int = DEFAULT_GHIDRA_PORT, offset: int = 0, limit: int = 100) -> dict:
"""List exported symbols with pagination
Args:
port: Ghidra instance port (default: 8192)
offset: Pagination offset (default: 0)
limit: Maximum items to return (default: 100)
Returns:
dict: {
"result": list of exported symbols,
"size": total count,
"offset": current offset,
"limit": current limit,
"_links": pagination links
}
"""
response = safe_get(port, "programs/current/symbols/exports", {"offset": offset, "limit": limit})
if isinstance(response, dict) and "error" in response:
return response
return {
"result": response.get("result", []),
"size": response.get("size", len(response.get("result", []))),
"offset": offset,
"limit": limit,
"_links": response.get("_links", {})
}
@mcp.tool()
def list_namespaces(port: int = DEFAULT_GHIDRA_PORT, offset: int = 0, limit: int = 100) -> list:
"""List namespaces with pagination
Args:
port: Ghidra instance port (default: 8192)
offset: Pagination offset (default: 0)
limit: Maximum items to return (default: 100)
Returns:
list: Namespace information strings
"""
return safe_get(port, "namespaces", {"offset": offset, "limit": limit})
@mcp.tool()
def list_data_items(port: int = DEFAULT_GHIDRA_PORT,
offset: int = 0,
limit: int = 100,
addr: str = None,
name: str = None,
name_contains: str = None,
type: str = None) -> dict:
"""List defined data items with filtering and pagination
Args:
port: Ghidra instance port (default: 8192)
offset: Pagination offset (default: 0)
limit: Maximum items to return (default: 100)
addr: Filter by address (hexadecimal)
name: Exact name match filter (case-sensitive)
name_contains: Substring name filter (case-insensitive)
type: Filter by data type (e.g. "string", "dword")
Returns:
dict: {
"result": list of data item objects,
"size": total count,
"offset": current offset,
"limit": current limit,
"_links": pagination links
}
"""
params = {
"offset": offset,
"limit": limit
}
if addr:
params["addr"] = addr
if name:
params["name"] = name
if name_contains:
params["name_contains"] = name_contains
if type:
params["type"] = type
response = safe_get(port, "programs/current/data", params)
if isinstance(response, dict) and "error" in response:
return response
return {
"result": response.get("result", []),
"size": response.get("size", len(response.get("result", []))),
"offset": offset,
"limit": limit,
"_links": response.get("_links", {})
}
@mcp.tool()
def search_functions_by_name(port: int = DEFAULT_GHIDRA_PORT, query: str = "", offset: int = 0, limit: int = 100) -> list:
"""Search functions by name with pagination
Args:
port: Ghidra instance port (default: 8192)
query: Search string for function names
offset: Pagination offset (default: 0)
limit: Maximum items to return (default: 100)
Returns:
list: Matching function info or error if query empty
"""
if not query:
return ["Error: query string is required"]
return safe_get(port, "functions", {"query": query, "offset": offset, "limit": limit})
@mcp.tool()
def read_memory(port: int = DEFAULT_GHIDRA_PORT,
address: str = "",
length: int = 16,
format: str = "hex") -> dict:
"""Read bytes from memory
Args:
port: Ghidra instance port (default: 8192)
address: Memory address in hex format
length: Number of bytes to read (default: 16)
format: Output format - "hex", "base64", or "string" (default: "hex")
Returns:
dict: {
"address": original address,
"length": bytes read,
"format": output format,
"bytes": the memory contents,
"timestamp": response timestamp
}
"""
if not address:
return {
"success": False,
"error": "Address parameter is required",
"timestamp": int(time.time() * 1000)
}
response = safe_get(port, "programs/current/memory", {
"address": address,
"length": length,
"format": format
})
if isinstance(response, dict) and "error" in response:
return response
return {
"address": address,
"length": length,
"format": format,
"bytes": response.get("result", ""),
"timestamp": response.get("timestamp", int(time.time() * 1000))
}
@mcp.tool()
def write_memory(port: int = DEFAULT_GHIDRA_PORT,
address: str = "",
bytes: str = "",
format: str = "hex") -> dict:
"""Write bytes to memory (use with caution)
Args:
port: Ghidra instance port (default: 8192)
address: Memory address in hex format
bytes: Data to write (format depends on 'format' parameter)
format: Input format - "hex", "base64", or "string" (default: "hex")
Returns:
dict: Operation result with success status
"""
if not address or not bytes:
return {
"success": False,
"error": "Address and bytes parameters are required",
"timestamp": int(time.time() * 1000)
}
return safe_post(port, "programs/current/memory", {
"address": address,
"bytes": bytes,
"format": format
})
@mcp.tool()
def get_function_by_address(port: int = DEFAULT_GHIDRA_PORT, address: str = "") -> dict:
"""Get function details by memory address
Args:
port: Ghidra instance port (default: 8192)
address: Memory address in hex format
Returns:
dict: Contains function name, address, signature and decompilation
"""
return safe_get(port, "get_function_by_address", {"address": address})
@mcp.tool()
def get_current_address(port: int = DEFAULT_GHIDRA_PORT) -> dict:
"""Get the address currently selected in Ghidra's UI
Args:
port: Ghidra instance port (default: 8192)
Returns:
Dict containing:
- success: boolean indicating success
- result: object with address field
- error: error message if failed
- timestamp: timestamp of response
"""
response = safe_get(port, "get_current_address")
if isinstance(response, dict) and "success" in response:
return response
return {
"success": False,
"error": "Unexpected response format from Ghidra plugin",
"timestamp": int(time.time() * 1000),
"port": port
}
@mcp.tool()
def list_xrefs(port: int = DEFAULT_GHIDRA_PORT,
to_addr: str = None,
from_addr: str = None,
type: str = None,
offset: int = 0,
limit: int = 100) -> dict:
"""List cross-references with filtering and pagination
Args:
port: Ghidra instance port (default: 8192)
to_addr: Filter references to this address (hexadecimal)
from_addr: Filter references from this address (hexadecimal)
type: Filter by reference type (e.g. "CALL", "READ", "WRITE")
offset: Pagination offset (default: 0)
limit: Maximum items to return (default: 100)
Returns:
dict: {
"result": list of xref objects,
"size": total count,
"offset": current offset,
"limit": current limit,
"_links": pagination links
}
"""
params = {
"offset": offset,
"limit": limit
}
if to_addr:
params["to_addr"] = to_addr
if from_addr:
params["from_addr"] = from_addr
if type:
params["type"] = type
response = safe_get(port, "programs/current/xrefs", params)
if isinstance(response, dict) and "error" in response:
return response
return {
"result": response.get("result", []),
"size": response.get("size", len(response.get("result", []))),
"offset": offset,
"limit": limit,
"_links": response.get("_links", {})
}
@mcp.tool()
def analyze_program(port: int = DEFAULT_GHIDRA_PORT,
analysis_options: dict = None) -> dict:
"""Run analysis on the current program
Args:
port: Ghidra instance port (default: 8192)
analysis_options: Dictionary of analysis options to enable/disable
(e.g. {"functionRecovery": True, "dataRefs": False})
None means use default analysis options
Returns:
dict: Analysis operation result with status
"""
return safe_post(port, "programs/current/analysis", analysis_options or {})
@mcp.tool()
def get_callgraph(port: int = DEFAULT_GHIDRA_PORT,
function: str = None,
max_depth: int = 3) -> dict:
"""Get function call graph visualization data
Args:
port: Ghidra instance port (default: 8192)
function: Starting function name (None starts from entry point)
max_depth: Maximum call depth to analyze (default: 3)
Returns:
dict: Graph data in DOT format with nodes and edges
"""
params = {"max_depth": max_depth}
if function:
params["function"] = function
return safe_get(port, "programs/current/analysis/callgraph", params)
@mcp.tool()
def get_dataflow(port: int = DEFAULT_GHIDRA_PORT,
address: str = "",
direction: str = "forward",
max_steps: int = 50) -> dict:
"""Perform data flow analysis from an address
Args:
port: Ghidra instance port (default: 8192)
address: Starting address in hex format
direction: "forward" or "backward" (default: "forward")
max_steps: Maximum analysis steps (default: 50)
Returns:
dict: Data flow analysis results
"""
return safe_get(port, "programs/current/analysis/dataflow", {
"address": address,
"direction": direction,
"max_steps": max_steps
})
@mcp.tool()
def get_current_function(port: int = DEFAULT_GHIDRA_PORT) -> dict:
"""Get the function currently selected in Ghidra's UI
Args:
port: Ghidra instance port (default: 8192)
Returns:
Dict containing:
- success: boolean indicating success
- result: object with name, address and signature fields
- error: error message if failed
- timestamp: timestamp of response
"""
response = safe_get(port, "get_current_function")
if isinstance(response, dict) and "success" in response:
return response
return {
"success": False,
"error": "Unexpected response format from Ghidra plugin",
"timestamp": int(time.time() * 1000),
"port": port
}
@mcp.tool()
def decompile_function_by_address(port: int = DEFAULT_GHIDRA_PORT, address: str = "", cCode: bool = True, syntaxTree: bool = False, simplificationStyle: str = "normalize") -> dict:
"""Decompile function at memory address
Args:
port: Ghidra instance port (default: 8192)
address: Memory address in hex format
cCode: Return C-style code (default: True)
syntaxTree: Include syntax tree (default: False)
simplificationStyle: Decompiler style (default: "normalize")
Returns:
dict: Contains decompiled code in 'result.decompilation'
"""
return safe_get(port, "decompile_function", {
"address": address,
"cCode": str(cCode).lower(),
"syntaxTree": str(syntaxTree).lower(),
"simplificationStyle": simplificationStyle
})
@mcp.tool()
def disassemble_function(port: int = DEFAULT_GHIDRA_PORT, address: str = "") -> dict:
"""Get disassembly for function at address
Args:
port: Ghidra instance port (default: 8192)
address: Memory address in hex format
Returns:
dict: Contains assembly instructions with addresses and comments
"""
return safe_get(port, "disassemble_function", {"address": address})
@mcp.tool()
def set_decompiler_comment(port: int = DEFAULT_GHIDRA_PORT, address: str = "", comment: str = "") -> str:
"""Add/edit decompiler comment at address
Args:
port: Ghidra instance port (default: 8192)
address: Memory address in hex format
comment: Comment text to add
Returns:
str: Confirmation message or error
"""
return safe_post(port, "set_decompiler_comment", {"address": address, "comment": comment})
@mcp.tool()
def set_disassembly_comment(port: int = DEFAULT_GHIDRA_PORT, address: str = "", comment: str = "") -> str:
"""Add/edit disassembly comment at address
Args:
port: Ghidra instance port (default: 8192)
address: Memory address in hex format
comment: Comment text to add
Returns:
str: Confirmation message or error
"""
return safe_post(port, "set_disassembly_comment", {"address": address, "comment": comment})
@mcp.tool()
def rename_local_variable(port: int = DEFAULT_GHIDRA_PORT, function_address: str = "", old_name: str = "", new_name: str = "") -> str:
"""Rename local variable in function
Args:
port: Ghidra instance port (default: 8192)
function_address: Function memory address in hex
old_name: Current variable name
new_name: New variable name
Returns:
str: Confirmation message or error
"""
return safe_post(port, "rename_local_variable", {"functionAddress": function_address, "oldName": old_name, "newName": new_name})
@mcp.tool()
def rename_function_by_address(port: int = DEFAULT_GHIDRA_PORT, function_address: str = "", new_name: str = "") -> str:
"""Rename function at memory address
Args:
port: Ghidra instance port (default: 8192)
function_address: Function memory address in hex
new_name: New function name
Returns:
str: Confirmation message or error
"""
return safe_post(port, "rename_function_by_address", {"functionAddress": function_address, "newName": new_name})
@mcp.tool()
def set_function_prototype(port: int = DEFAULT_GHIDRA_PORT, function_address: str = "", prototype: str = "") -> str:
"""Update function signature/prototype
Args:
port: Ghidra instance port (default: 8192)
function_address: Function memory address in hex
prototype: New prototype string (e.g. "int func(int param1)")
Returns:
str: Confirmation message or error
"""
return safe_post(port, "set_function_prototype", {"functionAddress": function_address, "prototype": prototype})
@mcp.tool()
def set_local_variable_type(port: int = DEFAULT_GHIDRA_PORT, function_address: str = "", variable_name: str = "", new_type: str = "") -> str:
"""Change local variable data type
Args:
port: Ghidra instance port (default: 8192)
function_address: Function memory address in hex
variable_name: Variable name to modify
new_type: New data type (e.g. "int", "char*")
Returns:
str: Confirmation message or error
"""
return safe_post(port, "set_local_variable_type", {"functionAddress": function_address, "variableName": variable_name, "newType": new_type})
@mcp.tool()
def list_variables(port: int = DEFAULT_GHIDRA_PORT, offset: int = 0, limit: int = 100, search: str = "") -> dict:
"""List global variables with optional search
Args:
port: Ghidra instance port (default: 8192)
offset: Pagination offset (default: 0)
limit: Maximum items to return (default: 100)
search: Optional filter for variable names
Returns:
dict: Contains variables list in 'result' field
"""
params = {"offset": offset, "limit": limit}
if search:
params["search"] = search
return safe_get(port, "variables", params)
@mcp.tool()
def list_function_variables(port: int = DEFAULT_GHIDRA_PORT, function: str = "") -> dict:
"""List variables in function
Args:
port: Ghidra instance port (default: 8192)
function: Function name to list variables for
Returns:
dict: Contains variables list in 'result.variables'
"""
if not function:
return {"success": False, "error": "Function name is required"}
encoded_name = quote(function)
return safe_get(port, f"functions/{encoded_name}/variables", {})
@mcp.tool()
def rename_variable(port: int = DEFAULT_GHIDRA_PORT, function: str = "", name: str = "", new_name: str = "") -> dict:
"""Rename variable in function
Args:
port: Ghidra instance port (default: 8192)
function: Function name containing variable
name: Current variable name
new_name: New variable name
Returns:
dict: Operation result
"""
if not function or not name or not new_name:
return {"success": False, "error": "Function, name, and new_name parameters are required"}
encoded_function = quote(function)
encoded_var = quote(name)
return safe_post(port, f"functions/{encoded_function}/variables/{encoded_var}", {"newName": new_name})
@mcp.tool()
def retype_variable(port: int = DEFAULT_GHIDRA_PORT, function: str = "", name: str = "", data_type: str = "") -> dict:
"""Change variable data type in function
Args:
port: Ghidra instance port (default: 8192)
function: Function name containing variable
name: Variable name to modify
data_type: New data type
Returns:
dict: Operation result
"""
if not function or not name or not data_type:
return {"success": False, "error": "Function, name, and data_type parameters are required"}
encoded_function = quote(function)
encoded_var = quote(name)
return safe_post(port, f"functions/{encoded_function}/variables/{encoded_var}", {"dataType": data_type})
def handle_sigint(signum, frame):
os._exit(0)
def periodic_discovery():
"""Periodically discover new instances"""
while True:
try:
_discover_instances(FULL_DISCOVERY_RANGE, timeout=0.5)
with instances_lock:
ports_to_remove = []
for port, info in active_instances.items():
url = info["url"]
try:
response = requests.get(f"{url}/instances", timeout=1)
if not response.ok:
ports_to_remove.append(port)
except requests.exceptions.RequestException:
ports_to_remove.append(port)
for port in ports_to_remove:
del active_instances[port]
print(f"Removed unreachable instance on port {port}")
except Exception as e:
print(f"Error in periodic discovery: {e}")
time.sleep(30)
if __name__ == "__main__":
register_instance(DEFAULT_GHIDRA_PORT, f"http://{ghidra_host}:{DEFAULT_GHIDRA_PORT}")
discover_instances()
discovery_thread = threading.Thread(
target=periodic_discovery,
daemon=True,
name="GhydraMCP-Discovery"
)
discovery_thread.start()
signal.signal(signal.SIGINT, handle_sigint)
mcp.run(transport="stdio")