mcghidra/bridge_mcp_ghidra.py
2025-03-28 19:45:43 +01:00

141 lines
4.1 KiB
Python

# /// script
# requires-python = ">=3.11"
# dependencies = [
# "mcp==1.5.0",
# "requests==2.32.3",
# ]
# ///
import sys
import requests
from mcp.server.fastmcp import FastMCP
DEFAULT_GHIDRA_SERVER = "http://127.0.0.1:8080/"
ghidra_server_url = sys.argv[1] if len(sys.argv) > 1 else DEFAULT_GHIDRA_SERVER
mcp = FastMCP("ghidra-mcp")
def safe_get(endpoint: str, params: dict = None) -> list:
"""
Perform a GET request with optional query parameters.
"""
if params is None:
params = {}
url = f"{ghidra_server_url}/{endpoint}"
try:
response = requests.get(url, params=params, timeout=5)
response.encoding = 'utf-8'
if response.ok:
return response.text.splitlines()
else:
return [f"Error {response.status_code}: {response.text.strip()}"]
except Exception as e:
return [f"Request failed: {str(e)}"]
def safe_post(endpoint: str, data: dict | str) -> str:
try:
if isinstance(data, dict):
response = requests.post(f"{ghidra_server_url}/{endpoint}", data=data, timeout=5)
else:
response = requests.post(f"{ghidra_server_url}/{endpoint}", data=data.encode("utf-8"), timeout=5)
response.encoding = 'utf-8'
if response.ok:
return response.text.strip()
else:
return f"Error {response.status_code}: {response.text.strip()}"
except Exception as e:
return f"Request failed: {str(e)}"
@mcp.tool()
def list_methods(offset: int = 0, limit: int = 100) -> list:
"""
List all function names in the program with pagination.
"""
return safe_get("methods", {"offset": offset, "limit": limit})
@mcp.tool()
def list_classes(offset: int = 0, limit: int = 100) -> list:
"""
List all namespace/class names in the program with pagination.
"""
return safe_get("classes", {"offset": offset, "limit": limit})
@mcp.tool()
def decompile_function(name: str) -> str:
"""
Decompile a specific function by name and return the decompiled C code.
"""
return safe_post("decompile", name)
@mcp.tool()
def rename_function(old_name: str, new_name: str) -> str:
"""
Rename a function by its current name to a new user-defined name.
"""
return safe_post("renameFunction", {"oldName": old_name, "newName": new_name})
@mcp.tool()
def rename_data(address: str, new_name: str) -> str:
"""
Rename a data label at the specified address.
"""
return safe_post("renameData", {"address": address, "newName": new_name})
@mcp.tool()
def list_segments(offset: int = 0, limit: int = 100) -> list:
"""
List all memory segments in the program with pagination.
"""
return safe_get("segments", {"offset": offset, "limit": limit})
@mcp.tool()
def list_imports(offset: int = 0, limit: int = 100) -> list:
"""
List imported symbols in the program with pagination.
"""
return safe_get("imports", {"offset": offset, "limit": limit})
@mcp.tool()
def list_exports(offset: int = 0, limit: int = 100) -> list:
"""
List exported functions/symbols with pagination.
"""
return safe_get("exports", {"offset": offset, "limit": limit})
@mcp.tool()
def list_namespaces(offset: int = 0, limit: int = 100) -> list:
"""
List all non-global namespaces in the program with pagination.
"""
return safe_get("namespaces", {"offset": offset, "limit": limit})
@mcp.tool()
def list_data_items(offset: int = 0, limit: int = 100) -> list:
"""
List defined data labels and their values with pagination.
"""
return safe_get("data", {"offset": offset, "limit": limit})
@mcp.tool()
def search_functions_by_name(query: str, offset: int = 0, limit: int = 100) -> list:
"""
Search for functions whose name contains the given substring.
"""
if not query:
return ["Error: query string is required"]
return safe_get("searchFunctions", {"query": query, "offset": offset, "limit": limit})
import signal
import os
def handle_sigint(signum, frame):
os._exit(0)
if __name__ == "__main__":
signal.signal(signal.SIGINT, handle_sigint)
mcp.run()