1335 lines
42 KiB
Python
1335 lines
42 KiB
Python
# /// script
|
|
# requires-python = ">=3.11"
|
|
# dependencies = [
|
|
# "mcp==1.6.0",
|
|
# "requests==2.32.3",
|
|
# ]
|
|
# ///
|
|
import os
|
|
import signal
|
|
import sys
|
|
import threading
|
|
import time
|
|
from threading import Lock
|
|
from typing import Dict, List
|
|
from urllib.parse import quote
|
|
from urllib.parse import urlparse
|
|
|
|
import requests
|
|
from mcp.server.fastmcp import FastMCP
|
|
|
|
# Allowed origins for CORS/CSRF protection
|
|
ALLOWED_ORIGINS = os.environ.get(
|
|
"GHIDRA_ALLOWED_ORIGINS", "http://localhost").split(",")
|
|
|
|
# Track active Ghidra instances (port -> info dict)
|
|
active_instances: Dict[int, dict] = {}
|
|
instances_lock = Lock()
|
|
DEFAULT_GHIDRA_PORT = 8192
|
|
DEFAULT_GHIDRA_HOST = "localhost"
|
|
# Port ranges for scanning
|
|
QUICK_DISCOVERY_RANGE = range(DEFAULT_GHIDRA_PORT, DEFAULT_GHIDRA_PORT+10)
|
|
FULL_DISCOVERY_RANGE = range(DEFAULT_GHIDRA_PORT, DEFAULT_GHIDRA_PORT+20)
|
|
|
|
instructions = """
|
|
GhydraMCP allows interacting with multiple Ghidra SRE instances. Ghidra SRE is a tool for reverse engineering and analyzing binaries, e.g. malware.
|
|
|
|
First, run `discover_instances` to find open Ghidra instances. List tools to see what GhydraMCP can do.
|
|
"""
|
|
|
|
mcp = FastMCP("GhydraMCP", instructions=instructions)
|
|
|
|
ghidra_host = os.environ.get("GHIDRA_HYDRA_HOST", DEFAULT_GHIDRA_HOST)
|
|
|
|
|
|
def get_instance_url(port: int) -> str:
|
|
"""Get URL for a Ghidra instance by port"""
|
|
with instances_lock:
|
|
if port in active_instances:
|
|
return active_instances[port]["url"]
|
|
|
|
if 8192 <= port <= 65535:
|
|
register_instance(port)
|
|
if port in active_instances:
|
|
return active_instances[port]["url"]
|
|
|
|
return f"http://{ghidra_host}:{port}"
|
|
|
|
|
|
def validate_origin(headers: dict) -> bool:
|
|
"""Validate request origin against allowed origins"""
|
|
origin = headers.get("Origin")
|
|
if not origin:
|
|
# No origin header - allow (browser same-origin policy applies)
|
|
return True
|
|
|
|
# Parse origin to get scheme+hostname
|
|
try:
|
|
parsed = urlparse(origin)
|
|
origin_base = f"{parsed.scheme}://{parsed.hostname}"
|
|
if parsed.port:
|
|
origin_base += f":{parsed.port}"
|
|
except:
|
|
return False
|
|
|
|
return origin_base in ALLOWED_ORIGINS
|
|
|
|
|
|
def _make_request(method: str, port: int, endpoint: str, params: dict = None, json_data: dict = None, data: str = None, headers: dict = None) -> dict:
|
|
"""Internal helper to make HTTP requests and handle common errors."""
|
|
url = f"{get_instance_url(port)}/{endpoint}"
|
|
request_headers = {'Accept': 'application/json'}
|
|
if headers:
|
|
request_headers.update(headers)
|
|
|
|
is_state_changing = method.upper() in ["POST", "PUT", "PATCH", "DELETE"]
|
|
if is_state_changing:
|
|
check_headers = json_data.get("headers", {}) if isinstance(
|
|
json_data, dict) else (headers or {})
|
|
if not validate_origin(check_headers):
|
|
return {
|
|
"success": False,
|
|
"error": "Origin not allowed",
|
|
"status_code": 403,
|
|
"timestamp": int(time.time() * 1000)
|
|
}
|
|
if json_data is not None:
|
|
request_headers['Content-Type'] = 'application/json'
|
|
elif data is not None:
|
|
request_headers['Content-Type'] = 'text/plain'
|
|
|
|
try:
|
|
response = requests.request(
|
|
method,
|
|
url,
|
|
params=params,
|
|
json=json_data,
|
|
data=data,
|
|
headers=request_headers,
|
|
timeout=10
|
|
)
|
|
|
|
try:
|
|
parsed_json = response.json()
|
|
if isinstance(parsed_json, dict) and "timestamp" not in parsed_json:
|
|
parsed_json["timestamp"] = int(time.time() * 1000)
|
|
return parsed_json
|
|
except ValueError:
|
|
if response.ok:
|
|
return {
|
|
"success": False,
|
|
"error": "Received non-JSON success response from Ghidra plugin",
|
|
"status_code": response.status_code,
|
|
"response_text": response.text[:500],
|
|
"timestamp": int(time.time() * 1000)
|
|
}
|
|
else:
|
|
return {
|
|
"success": False,
|
|
"error": f"HTTP {response.status_code} - Non-JSON error response",
|
|
"status_code": response.status_code,
|
|
"response_text": response.text[:500],
|
|
"timestamp": int(time.time() * 1000)
|
|
}
|
|
|
|
except requests.exceptions.Timeout:
|
|
return {
|
|
"success": False,
|
|
"error": "Request timed out",
|
|
"status_code": 408,
|
|
"timestamp": int(time.time() * 1000)
|
|
}
|
|
except requests.exceptions.ConnectionError:
|
|
return {
|
|
"success": False,
|
|
"error": f"Failed to connect to Ghidra instance at {url}",
|
|
"status_code": 503,
|
|
"timestamp": int(time.time() * 1000)
|
|
}
|
|
except Exception as e:
|
|
return {
|
|
"success": False,
|
|
"error": f"An unexpected error occurred: {str(e)}",
|
|
"exception": e.__class__.__name__,
|
|
"timestamp": int(time.time() * 1000)
|
|
}
|
|
|
|
|
|
def safe_get(port: int, endpoint: str, params: dict = None) -> dict:
|
|
"""Make GET request to Ghidra instance"""
|
|
return _make_request("GET", port, endpoint, params=params)
|
|
|
|
|
|
def safe_put(port: int, endpoint: str, data: dict) -> dict:
|
|
"""Make PUT request to Ghidra instance with JSON payload"""
|
|
headers = data.pop("headers", None) if isinstance(data, dict) else None
|
|
return _make_request("PUT", port, endpoint, json_data=data, headers=headers)
|
|
|
|
|
|
def safe_post(port: int, endpoint: str, data: dict | str) -> dict:
|
|
"""Perform a POST request to a specific Ghidra instance with JSON or text payload"""
|
|
headers = None
|
|
json_payload = None
|
|
text_payload = None
|
|
|
|
if isinstance(data, dict):
|
|
headers = data.pop("headers", None)
|
|
json_payload = data
|
|
else:
|
|
text_payload = data
|
|
|
|
return _make_request("POST", port, endpoint, json_data=json_payload, data=text_payload, headers=headers)
|
|
|
|
# Instance management tools
|
|
|
|
|
|
@mcp.tool()
|
|
def list_instances() -> dict:
|
|
"""List all active Ghidra instances
|
|
|
|
Returns:
|
|
dict: Contains 'instances' list with port, url, project and file info for each instance
|
|
"""
|
|
with instances_lock:
|
|
return {
|
|
"instances": [
|
|
{
|
|
"port": port,
|
|
"url": info["url"],
|
|
"project": info.get("project", ""),
|
|
"file": info.get("file", "")
|
|
}
|
|
for port, info in active_instances.items()
|
|
]
|
|
}
|
|
|
|
|
|
@mcp.tool()
|
|
def register_instance(port: int, url: str = None) -> str:
|
|
"""Register a new Ghidra instance
|
|
|
|
Args:
|
|
port: Port number of the Ghidra instance
|
|
url: Optional URL if different from default http://host:port
|
|
|
|
Returns:
|
|
str: Confirmation message or error
|
|
"""
|
|
if url is None:
|
|
url = f"http://{ghidra_host}:{port}"
|
|
|
|
try:
|
|
test_url = f"{url}/instances"
|
|
response = requests.get(test_url, timeout=2)
|
|
if not response.ok:
|
|
return f"Error: Instance at {url} is not responding properly"
|
|
|
|
project_info = {"url": url}
|
|
|
|
try:
|
|
root_url = f"{url}/"
|
|
root_response = requests.get(
|
|
root_url, timeout=1.5) # Short timeout for root
|
|
|
|
if root_response.ok:
|
|
try:
|
|
root_data = root_response.json()
|
|
|
|
if "project" in root_data and root_data["project"]:
|
|
project_info["project"] = root_data["project"]
|
|
if "file" in root_data and root_data["file"]:
|
|
project_info["file"] = root_data["file"]
|
|
|
|
except Exception as e:
|
|
print(f"Error parsing root info: {e}", file=sys.stderr)
|
|
|
|
if not project_info.get("project") and not project_info.get("file"):
|
|
info_url = f"{url}/info"
|
|
|
|
try:
|
|
info_response = requests.get(info_url, timeout=2)
|
|
if info_response.ok:
|
|
try:
|
|
info_data = info_response.json()
|
|
if "project" in info_data and info_data["project"]:
|
|
project_info["project"] = info_data["project"]
|
|
|
|
file_info = info_data.get("file", {})
|
|
if isinstance(file_info, dict) and file_info.get("name"):
|
|
project_info["file"] = file_info.get(
|
|
"name", "")
|
|
project_info["path"] = file_info.get(
|
|
"path", "")
|
|
project_info["architecture"] = file_info.get(
|
|
"architecture", "")
|
|
project_info["endian"] = file_info.get(
|
|
"endian", "")
|
|
print(
|
|
f"Info data parsed: {project_info}", file=sys.stderr)
|
|
except Exception as e:
|
|
print(
|
|
f"Error parsing info endpoint: {e}", file=sys.stderr)
|
|
except Exception as e:
|
|
print(
|
|
f"Error connecting to info endpoint: {e}", file=sys.stderr)
|
|
except Exception:
|
|
# Non-critical, continue with registration even if project info fails
|
|
pass
|
|
|
|
with instances_lock:
|
|
active_instances[port] = project_info
|
|
|
|
return f"Registered instance on port {port} at {url}"
|
|
except Exception as e:
|
|
return f"Error: Could not connect to instance at {url}: {str(e)}"
|
|
|
|
|
|
@mcp.tool()
|
|
def unregister_instance(port: int) -> str:
|
|
"""Unregister a Ghidra instance
|
|
|
|
Args:
|
|
port: Port number of the instance to unregister
|
|
|
|
Returns:
|
|
str: Confirmation message or error
|
|
"""
|
|
with instances_lock:
|
|
if port in active_instances:
|
|
del active_instances[port]
|
|
return f"Unregistered instance on port {port}"
|
|
return f"No instance found on port {port}"
|
|
|
|
|
|
@mcp.tool()
|
|
def discover_instances(host: str = None) -> dict:
|
|
"""Discover available Ghidra instances by scanning ports
|
|
|
|
Args:
|
|
host: Optional host to scan (default: configured ghidra_host)
|
|
|
|
Returns:
|
|
dict: Contains 'found' count and 'instances' list with discovery results
|
|
"""
|
|
return _discover_instances(QUICK_DISCOVERY_RANGE, host=host, timeout=0.5)
|
|
|
|
|
|
def _discover_instances(port_range, host=None, timeout=0.5) -> dict:
|
|
"""Internal function to discover Ghidra instances by scanning ports"""
|
|
found_instances = []
|
|
scan_host = host if host is not None else ghidra_host
|
|
|
|
for port in port_range:
|
|
if port in active_instances:
|
|
continue
|
|
|
|
url = f"http://{scan_host}:{port}"
|
|
try:
|
|
test_url = f"{url}/instances"
|
|
response = requests.get(test_url, timeout=timeout)
|
|
if response.ok:
|
|
result = register_instance(port, url)
|
|
found_instances.append(
|
|
{"port": port, "url": url, "result": result})
|
|
except requests.exceptions.RequestException:
|
|
# Instance not available, just continue
|
|
continue
|
|
|
|
return {
|
|
"found": len(found_instances),
|
|
"instances": found_instances
|
|
}
|
|
|
|
|
|
@mcp.tool()
|
|
def list_programs(port: int = DEFAULT_GHIDRA_PORT,
|
|
offset: int = 0,
|
|
limit: int = 100,
|
|
project: str = None) -> dict:
|
|
"""List all programs across all projects with filtering and pagination
|
|
|
|
Args:
|
|
port: Ghidra instance port (default: 8192)
|
|
offset: Pagination offset (default: 0)
|
|
limit: Maximum items to return (default: 100)
|
|
project: Filter by project name
|
|
|
|
Returns:
|
|
dict: {
|
|
"result": list of program info objects,
|
|
"size": total count,
|
|
"offset": current offset,
|
|
"limit": current limit,
|
|
"_links": pagination links
|
|
}
|
|
"""
|
|
params = {
|
|
"offset": offset,
|
|
"limit": limit
|
|
}
|
|
if project:
|
|
params["project"] = project
|
|
|
|
response = safe_get(port, "programs", params)
|
|
if isinstance(response, dict) and "error" in response:
|
|
return response
|
|
|
|
return {
|
|
"result": response.get("result", []),
|
|
"size": response.get("size", len(response.get("result", []))),
|
|
"offset": offset,
|
|
"limit": limit,
|
|
"_links": response.get("_links", {})
|
|
}
|
|
|
|
|
|
@mcp.tool()
|
|
def get_current_program(port: int = DEFAULT_GHIDRA_PORT) -> dict:
|
|
"""Get details about the currently loaded program
|
|
|
|
Args:
|
|
port: Ghidra instance port (default: 8192)
|
|
|
|
Returns:
|
|
dict: Program information including name, ID, language, etc.
|
|
"""
|
|
response = safe_get(port, "programs/current")
|
|
if isinstance(response, dict) and "error" in response:
|
|
return response
|
|
|
|
return response
|
|
|
|
|
|
@mcp.tool()
|
|
def list_functions(port: int = DEFAULT_GHIDRA_PORT,
|
|
offset: int = 0,
|
|
limit: int = 100,
|
|
addr: str = None,
|
|
name: str = None,
|
|
name_contains: str = None,
|
|
name_matches_regex: str = None) -> dict:
|
|
"""List functions in the current program with filtering and pagination
|
|
|
|
Args:
|
|
port: Ghidra instance port (default: 8192)
|
|
offset: Pagination offset (default: 0)
|
|
limit: Maximum items to return (default: 100)
|
|
addr: Filter by address (hexadecimal)
|
|
name: Exact name match filter (case-sensitive)
|
|
name_contains: Substring name filter (case-insensitive)
|
|
name_matches_regex: Regex name filter
|
|
|
|
Returns:
|
|
dict: {
|
|
"result": list of function info objects,
|
|
"size": total count,
|
|
"offset": current offset,
|
|
"limit": current limit,
|
|
"_links": pagination links
|
|
}
|
|
"""
|
|
params = {
|
|
"offset": offset,
|
|
"limit": limit
|
|
}
|
|
if addr:
|
|
params["addr"] = addr
|
|
if name:
|
|
params["name"] = name
|
|
if name_contains:
|
|
params["name_contains"] = name_contains
|
|
if name_matches_regex:
|
|
params["name_matches_regex"] = name_matches_regex
|
|
|
|
response = safe_get(port, "programs/current/functions", params)
|
|
if isinstance(response, dict) and "error" in response:
|
|
return response
|
|
|
|
# Transform to expected format if needed
|
|
return {
|
|
"result": response.get("result", []),
|
|
"size": response.get("size", len(response.get("result", []))),
|
|
"offset": offset,
|
|
"limit": limit,
|
|
"_links": response.get("_links", {})
|
|
}
|
|
|
|
|
|
@mcp.tool()
|
|
def list_classes(port: int = DEFAULT_GHIDRA_PORT, offset: int = 0, limit: int = 100) -> list:
|
|
"""List classes in the current program with pagination
|
|
|
|
Args:
|
|
port: Ghidra instance port (default: 8192)
|
|
offset: Pagination offset (default: 0)
|
|
limit: Maximum items to return (default: 100)
|
|
|
|
Returns:
|
|
list: Class names and info
|
|
"""
|
|
return safe_get(port, "classes", {"offset": offset, "limit": limit})
|
|
|
|
|
|
@mcp.tool()
|
|
def get_function(port: int = DEFAULT_GHIDRA_PORT, name: str = "", cCode: bool = True, syntaxTree: bool = False, simplificationStyle: str = "normalize") -> dict:
|
|
"""Get decompiled code for a function
|
|
|
|
Args:
|
|
port: Ghidra instance port (default: 8192)
|
|
name: Function name to decompile
|
|
cCode: Return C-style code (default: True)
|
|
syntaxTree: Include syntax tree (default: False)
|
|
simplificationStyle: Decompiler style (default: "normalize")
|
|
|
|
Returns:
|
|
dict: Contains function name, address, signature and decompilation
|
|
"""
|
|
return safe_get(port, f"functions/{quote(name)}", {
|
|
"cCode": str(cCode).lower(),
|
|
"syntaxTree": str(syntaxTree).lower(),
|
|
"simplificationStyle": simplificationStyle
|
|
})
|
|
|
|
|
|
@mcp.tool()
|
|
def update_function(port: int = DEFAULT_GHIDRA_PORT, name: str = "", new_name: str = "") -> str:
|
|
"""Rename a function
|
|
|
|
Args:
|
|
port: Ghidra instance port (default: 8192)
|
|
name: Current function name
|
|
new_name: New function name
|
|
|
|
Returns:
|
|
str: Confirmation message or error
|
|
"""
|
|
return safe_post(port, f"functions/{quote(name)}", {"newName": new_name})
|
|
|
|
|
|
@mcp.tool()
|
|
def update_data(port: int = DEFAULT_GHIDRA_PORT, address: str = "", new_name: str = "") -> str:
|
|
"""Rename data at a memory address
|
|
|
|
Args:
|
|
port: Ghidra instance port (default: 8192)
|
|
address: Memory address in hex format
|
|
new_name: New name for the data
|
|
|
|
Returns:
|
|
str: Confirmation message or error
|
|
"""
|
|
return safe_post(port, "data", {"address": address, "newName": new_name})
|
|
|
|
|
|
@mcp.tool()
|
|
def list_segments(port: int = DEFAULT_GHIDRA_PORT,
|
|
offset: int = 0,
|
|
limit: int = 100,
|
|
name: str = None) -> dict:
|
|
"""List memory segments with filtering and pagination
|
|
|
|
Args:
|
|
port: Ghidra instance port (default: 8192)
|
|
offset: Pagination offset (default: 0)
|
|
limit: Maximum items to return (default: 100)
|
|
name: Filter by segment name (case-sensitive substring match)
|
|
|
|
Returns:
|
|
dict: {
|
|
"result": list of segment objects,
|
|
"size": total count,
|
|
"offset": current offset,
|
|
"limit": current limit,
|
|
"_links": pagination links
|
|
}
|
|
"""
|
|
params = {
|
|
"offset": offset,
|
|
"limit": limit
|
|
}
|
|
if name:
|
|
params["name"] = name
|
|
|
|
response = safe_get(port, "programs/current/segments", params)
|
|
if isinstance(response, dict) and "error" in response:
|
|
return response
|
|
|
|
return {
|
|
"result": response.get("result", []),
|
|
"size": response.get("size", len(response.get("result", []))),
|
|
"offset": offset,
|
|
"limit": limit,
|
|
"_links": response.get("_links", {})
|
|
}
|
|
|
|
|
|
@mcp.tool()
|
|
def list_symbols(port: int = DEFAULT_GHIDRA_PORT,
|
|
offset: int = 0,
|
|
limit: int = 100,
|
|
addr: str = None,
|
|
name: str = None,
|
|
name_contains: str = None,
|
|
type: str = None) -> dict:
|
|
"""List symbols with filtering and pagination
|
|
|
|
Args:
|
|
port: Ghidra instance port (default: 8192)
|
|
offset: Pagination offset (default: 0)
|
|
limit: Maximum items to return (default: 100)
|
|
addr: Filter by address (hexadecimal)
|
|
name: Exact name match filter (case-sensitive)
|
|
name_contains: Substring name filter (case-insensitive)
|
|
type: Filter by symbol type (e.g. "function", "data", "label")
|
|
|
|
Returns:
|
|
dict: {
|
|
"result": list of symbol objects,
|
|
"size": total count,
|
|
"offset": current offset,
|
|
"limit": current limit,
|
|
"_links": pagination links
|
|
}
|
|
"""
|
|
params = {
|
|
"offset": offset,
|
|
"limit": limit
|
|
}
|
|
if addr:
|
|
params["addr"] = addr
|
|
if name:
|
|
params["name"] = name
|
|
if name_contains:
|
|
params["name_contains"] = name_contains
|
|
if type:
|
|
params["type"] = type
|
|
|
|
response = safe_get(port, "programs/current/symbols", params)
|
|
if isinstance(response, dict) and "error" in response:
|
|
return response
|
|
|
|
return {
|
|
"result": response.get("result", []),
|
|
"size": response.get("size", len(response.get("result", []))),
|
|
"offset": offset,
|
|
"limit": limit,
|
|
"_links": response.get("_links", {})
|
|
}
|
|
|
|
|
|
@mcp.tool()
|
|
def list_imports(port: int = DEFAULT_GHIDRA_PORT, offset: int = 0, limit: int = 100) -> dict:
|
|
"""List imported symbols with pagination
|
|
|
|
Args:
|
|
port: Ghidra instance port (default: 8192)
|
|
offset: Pagination offset (default: 0)
|
|
limit: Maximum items to return (default: 100)
|
|
|
|
Returns:
|
|
dict: {
|
|
"result": list of imported symbols,
|
|
"size": total count,
|
|
"offset": current offset,
|
|
"limit": current limit,
|
|
"_links": pagination links
|
|
}
|
|
"""
|
|
response = safe_get(port, "programs/current/symbols/imports",
|
|
{"offset": offset, "limit": limit})
|
|
if isinstance(response, dict) and "error" in response:
|
|
return response
|
|
|
|
return {
|
|
"result": response.get("result", []),
|
|
"size": response.get("size", len(response.get("result", []))),
|
|
"offset": offset,
|
|
"limit": limit,
|
|
"_links": response.get("_links", {})
|
|
}
|
|
|
|
|
|
@mcp.tool()
|
|
def list_exports(port: int = DEFAULT_GHIDRA_PORT, offset: int = 0, limit: int = 100) -> dict:
|
|
"""List exported symbols with pagination
|
|
|
|
Args:
|
|
port: Ghidra instance port (default: 8192)
|
|
offset: Pagination offset (default: 0)
|
|
limit: Maximum items to return (default: 100)
|
|
|
|
Returns:
|
|
dict: {
|
|
"result": list of exported symbols,
|
|
"size": total count,
|
|
"offset": current offset,
|
|
"limit": current limit,
|
|
"_links": pagination links
|
|
}
|
|
"""
|
|
response = safe_get(port, "programs/current/symbols/exports",
|
|
{"offset": offset, "limit": limit})
|
|
if isinstance(response, dict) and "error" in response:
|
|
return response
|
|
|
|
return {
|
|
"result": response.get("result", []),
|
|
"size": response.get("size", len(response.get("result", []))),
|
|
"offset": offset,
|
|
"limit": limit,
|
|
"_links": response.get("_links", {})
|
|
}
|
|
|
|
|
|
@mcp.tool()
|
|
def list_namespaces(port: int = DEFAULT_GHIDRA_PORT, offset: int = 0, limit: int = 100) -> list:
|
|
"""List namespaces with pagination
|
|
|
|
Args:
|
|
port: Ghidra instance port (default: 8192)
|
|
offset: Pagination offset (default: 0)
|
|
limit: Maximum items to return (default: 100)
|
|
|
|
Returns:
|
|
list: Namespace information strings
|
|
"""
|
|
return safe_get(port, "namespaces", {"offset": offset, "limit": limit})
|
|
|
|
|
|
@mcp.tool()
|
|
def list_data_items(port: int = DEFAULT_GHIDRA_PORT,
|
|
offset: int = 0,
|
|
limit: int = 100,
|
|
addr: str = None,
|
|
name: str = None,
|
|
name_contains: str = None,
|
|
type: str = None) -> dict:
|
|
"""List defined data items with filtering and pagination
|
|
|
|
Args:
|
|
port: Ghidra instance port (default: 8192)
|
|
offset: Pagination offset (default: 0)
|
|
limit: Maximum items to return (default: 100)
|
|
addr: Filter by address (hexadecimal)
|
|
name: Exact name match filter (case-sensitive)
|
|
name_contains: Substring name filter (case-insensitive)
|
|
type: Filter by data type (e.g. "string", "dword")
|
|
|
|
Returns:
|
|
dict: {
|
|
"result": list of data item objects,
|
|
"size": total count,
|
|
"offset": current offset,
|
|
"limit": current limit,
|
|
"_links": pagination links
|
|
}
|
|
"""
|
|
params = {
|
|
"offset": offset,
|
|
"limit": limit
|
|
}
|
|
if addr:
|
|
params["addr"] = addr
|
|
if name:
|
|
params["name"] = name
|
|
if name_contains:
|
|
params["name_contains"] = name_contains
|
|
if type:
|
|
params["type"] = type
|
|
|
|
response = safe_get(port, "programs/current/data", params)
|
|
if isinstance(response, dict) and "error" in response:
|
|
return response
|
|
|
|
return {
|
|
"result": response.get("result", []),
|
|
"size": response.get("size", len(response.get("result", []))),
|
|
"offset": offset,
|
|
"limit": limit,
|
|
"_links": response.get("_links", {})
|
|
}
|
|
|
|
|
|
@mcp.tool()
|
|
def search_functions_by_name(port: int = DEFAULT_GHIDRA_PORT, query: str = "", offset: int = 0, limit: int = 100) -> list:
|
|
"""Search functions by name with pagination
|
|
|
|
Args:
|
|
port: Ghidra instance port (default: 8192)
|
|
query: Search string for function names
|
|
offset: Pagination offset (default: 0)
|
|
limit: Maximum items to return (default: 100)
|
|
|
|
Returns:
|
|
list: Matching function info or error if query empty
|
|
"""
|
|
if not query:
|
|
return ["Error: query string is required"]
|
|
return safe_get(port, "functions", {"query": query, "offset": offset, "limit": limit})
|
|
|
|
|
|
@mcp.tool()
|
|
def read_memory(port: int = DEFAULT_GHIDRA_PORT,
|
|
address: str = "",
|
|
length: int = 16,
|
|
format: str = "hex") -> dict:
|
|
"""Read bytes from memory
|
|
|
|
Args:
|
|
port: Ghidra instance port (default: 8192)
|
|
address: Memory address in hex format
|
|
length: Number of bytes to read (default: 16)
|
|
format: Output format - "hex", "base64", or "string" (default: "hex")
|
|
|
|
Returns:
|
|
dict: {
|
|
"address": original address,
|
|
"length": bytes read,
|
|
"format": output format,
|
|
"bytes": the memory contents,
|
|
"timestamp": response timestamp
|
|
}
|
|
"""
|
|
if not address:
|
|
return {
|
|
"success": False,
|
|
"error": "Address parameter is required",
|
|
"timestamp": int(time.time() * 1000)
|
|
}
|
|
|
|
response = safe_get(port, "programs/current/memory", {
|
|
"address": address,
|
|
"length": length,
|
|
"format": format
|
|
})
|
|
|
|
if isinstance(response, dict) and "error" in response:
|
|
return response
|
|
|
|
return {
|
|
"address": address,
|
|
"length": length,
|
|
"format": format,
|
|
"bytes": response.get("result", ""),
|
|
"timestamp": response.get("timestamp", int(time.time() * 1000))
|
|
}
|
|
|
|
|
|
@mcp.tool()
|
|
def write_memory(port: int = DEFAULT_GHIDRA_PORT,
|
|
address: str = "",
|
|
bytes: str = "",
|
|
format: str = "hex") -> dict:
|
|
"""Write bytes to memory (use with caution)
|
|
|
|
Args:
|
|
port: Ghidra instance port (default: 8192)
|
|
address: Memory address in hex format
|
|
bytes: Data to write (format depends on 'format' parameter)
|
|
format: Input format - "hex", "base64", or "string" (default: "hex")
|
|
|
|
Returns:
|
|
dict: Operation result with success status
|
|
"""
|
|
if not address or not bytes:
|
|
return {
|
|
"success": False,
|
|
"error": "Address and bytes parameters are required",
|
|
"timestamp": int(time.time() * 1000)
|
|
}
|
|
|
|
return safe_post(port, "programs/current/memory", {
|
|
"address": address,
|
|
"bytes": bytes,
|
|
"format": format
|
|
})
|
|
|
|
|
|
@mcp.tool()
|
|
def get_function_by_address(port: int = DEFAULT_GHIDRA_PORT, address: str = "") -> dict:
|
|
"""Get function details by memory address
|
|
|
|
Args:
|
|
port: Ghidra instance port (default: 8192)
|
|
address: Memory address in hex format
|
|
|
|
Returns:
|
|
dict: Contains function name, address, signature and decompilation
|
|
"""
|
|
if not address:
|
|
return {
|
|
"success": False,
|
|
"error": "Address parameter is required",
|
|
"timestamp": int(time.time() * 1000)
|
|
}
|
|
|
|
# Use the HATEOAS endpoint
|
|
response = safe_get(port, f"programs/current/functions/{address}")
|
|
|
|
# Format the response for consistency
|
|
if isinstance(response, dict) and "success" in response and response["success"]:
|
|
# Add timestamp if not present
|
|
if "timestamp" not in response:
|
|
response["timestamp"] = int(time.time() * 1000)
|
|
# Add port for tracking
|
|
response["port"] = port
|
|
|
|
return response
|
|
|
|
|
|
@mcp.tool()
|
|
def get_current_address(port: int = DEFAULT_GHIDRA_PORT) -> dict:
|
|
"""Get the address currently selected in Ghidra's UI
|
|
|
|
Args:
|
|
port: Ghidra instance port (default: 8192)
|
|
|
|
Returns:
|
|
Dict containing:
|
|
- success: boolean indicating success
|
|
- result: object with address field
|
|
- error: error message if failed
|
|
- timestamp: timestamp of response
|
|
"""
|
|
response = safe_get(port, "get_current_address")
|
|
if isinstance(response, dict) and "success" in response:
|
|
return response
|
|
return {
|
|
"success": False,
|
|
"error": "Unexpected response format from Ghidra plugin",
|
|
"timestamp": int(time.time() * 1000),
|
|
"port": port
|
|
}
|
|
|
|
|
|
@mcp.tool()
|
|
def list_xrefs(port: int = DEFAULT_GHIDRA_PORT,
|
|
to_addr: str = None,
|
|
from_addr: str = None,
|
|
type: str = None,
|
|
offset: int = 0,
|
|
limit: int = 100) -> dict:
|
|
"""List cross-references with filtering and pagination
|
|
|
|
Args:
|
|
port: Ghidra instance port (default: 8192)
|
|
to_addr: Filter references to this address (hexadecimal)
|
|
from_addr: Filter references from this address (hexadecimal)
|
|
type: Filter by reference type (e.g. "CALL", "READ", "WRITE")
|
|
offset: Pagination offset (default: 0)
|
|
limit: Maximum items to return (default: 100)
|
|
|
|
Returns:
|
|
dict: {
|
|
"result": list of xref objects,
|
|
"size": total count,
|
|
"offset": current offset,
|
|
"limit": current limit,
|
|
"_links": pagination links
|
|
}
|
|
"""
|
|
params = {
|
|
"offset": offset,
|
|
"limit": limit
|
|
}
|
|
if to_addr:
|
|
params["to_addr"] = to_addr
|
|
if from_addr:
|
|
params["from_addr"] = from_addr
|
|
if type:
|
|
params["type"] = type
|
|
|
|
response = safe_get(port, "programs/current/xrefs", params)
|
|
if isinstance(response, dict) and "error" in response:
|
|
return response
|
|
|
|
return {
|
|
"result": response.get("result", []),
|
|
"size": response.get("size", len(response.get("result", []))),
|
|
"offset": offset,
|
|
"limit": limit,
|
|
"_links": response.get("_links", {})
|
|
}
|
|
|
|
|
|
@mcp.tool()
|
|
def analyze_program(port: int = DEFAULT_GHIDRA_PORT,
|
|
analysis_options: dict = None) -> dict:
|
|
"""Run analysis on the current program
|
|
|
|
Args:
|
|
port: Ghidra instance port (default: 8192)
|
|
analysis_options: Dictionary of analysis options to enable/disable
|
|
(e.g. {"functionRecovery": True, "dataRefs": False})
|
|
None means use default analysis options
|
|
|
|
Returns:
|
|
dict: Analysis operation result with status
|
|
"""
|
|
return safe_post(port, "programs/current/analysis", analysis_options or {})
|
|
|
|
|
|
@mcp.tool()
|
|
def get_callgraph(port: int = DEFAULT_GHIDRA_PORT,
|
|
function: str = None,
|
|
max_depth: int = 3) -> dict:
|
|
"""Get function call graph visualization data
|
|
|
|
Args:
|
|
port: Ghidra instance port (default: 8192)
|
|
function: Starting function name (None starts from entry point)
|
|
max_depth: Maximum call depth to analyze (default: 3)
|
|
|
|
Returns:
|
|
dict: Graph data in DOT format with nodes and edges
|
|
"""
|
|
params = {"max_depth": max_depth}
|
|
if function:
|
|
params["function"] = function
|
|
|
|
return safe_get(port, "programs/current/analysis/callgraph", params)
|
|
|
|
|
|
@mcp.tool()
|
|
def get_dataflow(port: int = DEFAULT_GHIDRA_PORT,
|
|
address: str = "",
|
|
direction: str = "forward",
|
|
max_steps: int = 50) -> dict:
|
|
"""Perform data flow analysis from an address
|
|
|
|
Args:
|
|
port: Ghidra instance port (default: 8192)
|
|
address: Starting address in hex format
|
|
direction: "forward" or "backward" (default: "forward")
|
|
max_steps: Maximum analysis steps (default: 50)
|
|
|
|
Returns:
|
|
dict: Data flow analysis results
|
|
"""
|
|
return safe_get(port, "programs/current/analysis/dataflow", {
|
|
"address": address,
|
|
"direction": direction,
|
|
"max_steps": max_steps
|
|
})
|
|
|
|
|
|
@mcp.tool()
|
|
def get_current_function(port: int = DEFAULT_GHIDRA_PORT) -> dict:
|
|
"""Get the function currently selected in Ghidra's UI
|
|
|
|
Args:
|
|
port: Ghidra instance port (default: 8192)
|
|
|
|
Returns:
|
|
Dict containing:
|
|
- success: boolean indicating success
|
|
- result: object with name, address and signature fields
|
|
- error: error message if failed
|
|
- timestamp: timestamp of response
|
|
"""
|
|
response = safe_get(port, "get_current_function")
|
|
if isinstance(response, dict) and "success" in response:
|
|
return response
|
|
return {
|
|
"success": False,
|
|
"error": "Unexpected response format from Ghidra plugin",
|
|
"timestamp": int(time.time() * 1000),
|
|
"port": port
|
|
}
|
|
|
|
|
|
@mcp.tool()
|
|
def decompile_function_by_address(port: int = DEFAULT_GHIDRA_PORT, address: str = "", cCode: bool = True, syntaxTree: bool = False, simplificationStyle: str = "normalize") -> dict:
|
|
"""Decompile function at memory address
|
|
|
|
Args:
|
|
port: Ghidra instance port (default: 8192)
|
|
address: Memory address in hex format
|
|
cCode: Return C-style code (default: True)
|
|
syntaxTree: Include syntax tree (default: False)
|
|
simplificationStyle: Decompiler style (default: "normalize")
|
|
|
|
Returns:
|
|
dict: Contains decompiled code and function information
|
|
"""
|
|
if not address:
|
|
return {
|
|
"success": False,
|
|
"error": "Address parameter is required",
|
|
"timestamp": int(time.time() * 1000)
|
|
}
|
|
|
|
# Use the HATEOAS endpoint
|
|
params = {
|
|
"syntax_tree": str(syntaxTree).lower(),
|
|
"style": simplificationStyle
|
|
}
|
|
|
|
response = safe_get(port, f"programs/current/functions/{address}/decompile", params)
|
|
|
|
# Format the response for consistency
|
|
if isinstance(response, dict) and "success" in response and response["success"]:
|
|
# Add timestamp if not present
|
|
if "timestamp" not in response:
|
|
response["timestamp"] = int(time.time() * 1000)
|
|
# Add port for tracking
|
|
response["port"] = port
|
|
|
|
# Ensure the result has a decompilation field for backward compatibility
|
|
if "result" in response and isinstance(response["result"], dict):
|
|
if "ccode" in response["result"] and "decompilation" not in response["result"]:
|
|
response["result"]["decompilation"] = response["result"]["ccode"]
|
|
|
|
return response
|
|
|
|
|
|
@mcp.tool()
|
|
def disassemble_function(port: int = DEFAULT_GHIDRA_PORT, address: str = "") -> dict:
|
|
"""Get disassembly for function at address
|
|
|
|
Args:
|
|
port: Ghidra instance port (default: 8192)
|
|
address: Memory address in hex format
|
|
|
|
Returns:
|
|
dict: Contains assembly instructions with addresses and comments
|
|
"""
|
|
if not address:
|
|
return {
|
|
"success": False,
|
|
"error": "Address parameter is required",
|
|
"timestamp": int(time.time() * 1000)
|
|
}
|
|
|
|
# Use the HATEOAS endpoint
|
|
response = safe_get(port, f"programs/current/functions/{address}/disassembly")
|
|
|
|
# Format the response for consistency
|
|
if isinstance(response, dict) and "success" in response and response["success"]:
|
|
# Add timestamp if not present
|
|
if "timestamp" not in response:
|
|
response["timestamp"] = int(time.time() * 1000)
|
|
# Add port for tracking
|
|
response["port"] = port
|
|
|
|
return response
|
|
|
|
|
|
@mcp.tool()
|
|
def set_decompiler_comment(port: int = DEFAULT_GHIDRA_PORT, address: str = "", comment: str = "") -> str:
|
|
"""Add/edit decompiler comment at address
|
|
|
|
Args:
|
|
port: Ghidra instance port (default: 8192)
|
|
address: Memory address in hex format
|
|
comment: Comment text to add
|
|
|
|
Returns:
|
|
str: Confirmation message or error
|
|
"""
|
|
return safe_post(port, "set_decompiler_comment", {"address": address, "comment": comment})
|
|
|
|
|
|
@mcp.tool()
|
|
def set_disassembly_comment(port: int = DEFAULT_GHIDRA_PORT, address: str = "", comment: str = "") -> str:
|
|
"""Add/edit disassembly comment at address
|
|
|
|
Args:
|
|
port: Ghidra instance port (default: 8192)
|
|
address: Memory address in hex format
|
|
comment: Comment text to add
|
|
|
|
Returns:
|
|
str: Confirmation message or error
|
|
"""
|
|
return safe_post(port, "set_disassembly_comment", {"address": address, "comment": comment})
|
|
|
|
|
|
@mcp.tool()
|
|
def rename_local_variable(port: int = DEFAULT_GHIDRA_PORT, function_address: str = "", old_name: str = "", new_name: str = "") -> str:
|
|
"""Rename local variable in function
|
|
|
|
Args:
|
|
port: Ghidra instance port (default: 8192)
|
|
function_address: Function memory address in hex
|
|
old_name: Current variable name
|
|
new_name: New variable name
|
|
|
|
Returns:
|
|
str: Confirmation message or error
|
|
"""
|
|
return safe_post(port, "rename_local_variable", {"functionAddress": function_address, "oldName": old_name, "newName": new_name})
|
|
|
|
|
|
@mcp.tool()
|
|
def rename_function_by_address(port: int = DEFAULT_GHIDRA_PORT, function_address: str = "", new_name: str = "") -> str:
|
|
"""Rename function at memory address
|
|
|
|
Args:
|
|
port: Ghidra instance port (default: 8192)
|
|
function_address: Function memory address in hex
|
|
new_name: New function name
|
|
|
|
Returns:
|
|
str: Confirmation message or error
|
|
"""
|
|
return safe_post(port, "rename_function_by_address", {"functionAddress": function_address, "newName": new_name})
|
|
|
|
|
|
@mcp.tool()
|
|
def set_function_prototype(port: int = DEFAULT_GHIDRA_PORT, function_address: str = "", prototype: str = "") -> str:
|
|
"""Update function signature/prototype
|
|
|
|
Args:
|
|
port: Ghidra instance port (default: 8192)
|
|
function_address: Function memory address in hex
|
|
prototype: New prototype string (e.g. "int func(int param1)")
|
|
|
|
Returns:
|
|
str: Confirmation message or error
|
|
"""
|
|
return safe_post(port, "set_function_prototype", {"functionAddress": function_address, "prototype": prototype})
|
|
|
|
|
|
@mcp.tool()
|
|
def set_local_variable_type(port: int = DEFAULT_GHIDRA_PORT, function_address: str = "", variable_name: str = "", new_type: str = "") -> str:
|
|
"""Change local variable data type
|
|
|
|
Args:
|
|
port: Ghidra instance port (default: 8192)
|
|
function_address: Function memory address in hex
|
|
variable_name: Variable name to modify
|
|
new_type: New data type (e.g. "int", "char*")
|
|
|
|
Returns:
|
|
str: Confirmation message or error
|
|
"""
|
|
return safe_post(port, "set_local_variable_type", {"functionAddress": function_address, "variableName": variable_name, "newType": new_type})
|
|
|
|
|
|
@mcp.tool()
|
|
def list_variables(port: int = DEFAULT_GHIDRA_PORT, offset: int = 0, limit: int = 100, search: str = "") -> dict:
|
|
"""List global variables with optional search
|
|
|
|
Args:
|
|
port: Ghidra instance port (default: 8192)
|
|
offset: Pagination offset (default: 0)
|
|
limit: Maximum items to return (default: 100)
|
|
search: Optional filter for variable names
|
|
|
|
Returns:
|
|
dict: Contains variables list in 'result' field
|
|
"""
|
|
params = {"offset": offset, "limit": limit}
|
|
if search:
|
|
params["search"] = search
|
|
|
|
return safe_get(port, "variables", params)
|
|
|
|
|
|
@mcp.tool()
|
|
def list_function_variables(port: int = DEFAULT_GHIDRA_PORT, function: str = "") -> dict:
|
|
"""List variables in function
|
|
|
|
Args:
|
|
port: Ghidra instance port (default: 8192)
|
|
function: Function name to list variables for
|
|
|
|
Returns:
|
|
dict: Contains variables list in 'result.variables'
|
|
"""
|
|
if not function:
|
|
return {"success": False, "error": "Function name is required"}
|
|
|
|
encoded_name = quote(function)
|
|
return safe_get(port, f"functions/{encoded_name}/variables", {})
|
|
|
|
|
|
@mcp.tool()
|
|
def rename_variable(port: int = DEFAULT_GHIDRA_PORT, function: str = "", name: str = "", new_name: str = "") -> dict:
|
|
"""Rename variable in function
|
|
|
|
Args:
|
|
port: Ghidra instance port (default: 8192)
|
|
function: Function name containing variable
|
|
name: Current variable name
|
|
new_name: New variable name
|
|
|
|
Returns:
|
|
dict: Operation result
|
|
"""
|
|
if not function or not name or not new_name:
|
|
return {"success": False, "error": "Function, name, and new_name parameters are required"}
|
|
|
|
encoded_function = quote(function)
|
|
encoded_var = quote(name)
|
|
return safe_post(port, f"functions/{encoded_function}/variables/{encoded_var}", {"newName": new_name})
|
|
|
|
|
|
@mcp.tool()
|
|
def retype_variable(port: int = DEFAULT_GHIDRA_PORT, function: str = "", name: str = "", data_type: str = "") -> dict:
|
|
"""Change variable data type in function
|
|
|
|
Args:
|
|
port: Ghidra instance port (default: 8192)
|
|
function: Function name containing variable
|
|
name: Variable name to modify
|
|
data_type: New data type
|
|
|
|
Returns:
|
|
dict: Operation result
|
|
"""
|
|
if not function or not name or not data_type:
|
|
return {"success": False, "error": "Function, name, and data_type parameters are required"}
|
|
|
|
encoded_function = quote(function)
|
|
encoded_var = quote(name)
|
|
return safe_post(port, f"functions/{encoded_function}/variables/{encoded_var}", {"dataType": data_type})
|
|
|
|
|
|
def handle_sigint(signum, frame):
|
|
os._exit(0)
|
|
|
|
|
|
def periodic_discovery():
|
|
"""Periodically discover new instances"""
|
|
while True:
|
|
try:
|
|
_discover_instances(FULL_DISCOVERY_RANGE, timeout=0.5)
|
|
|
|
with instances_lock:
|
|
ports_to_remove = []
|
|
for port, info in active_instances.items():
|
|
url = info["url"]
|
|
try:
|
|
response = requests.get(f"{url}/instances", timeout=1)
|
|
if not response.ok:
|
|
ports_to_remove.append(port)
|
|
except requests.exceptions.RequestException:
|
|
ports_to_remove.append(port)
|
|
|
|
for port in ports_to_remove:
|
|
del active_instances[port]
|
|
print(f"Removed unreachable instance on port {port}")
|
|
except Exception as e:
|
|
print(f"Error in periodic discovery: {e}")
|
|
|
|
time.sleep(30)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
register_instance(DEFAULT_GHIDRA_PORT,
|
|
f"http://{ghidra_host}:{DEFAULT_GHIDRA_PORT}")
|
|
|
|
discover_instances()
|
|
|
|
discovery_thread = threading.Thread(
|
|
target=periodic_discovery,
|
|
daemon=True,
|
|
name="GhydraMCP-Discovery"
|
|
)
|
|
discovery_thread.start()
|
|
|
|
signal.signal(signal.SIGINT, handle_sigint)
|
|
mcp.run(transport="stdio")
|