from mcp.server.fastmcp import FastMCP import requests ghidra_server_url = "http://localhost:8080" mcp = FastMCP("ghidra-mcp") def safe_get(endpoint: str, params: dict = None) -> list: """ Perform a GET request. If 'params' is given, we convert it to a query string. """ if params is None: params = {} qs = [f"{k}={v}" for k, v in params.items()] query_string = "&".join(qs) url = f"{ghidra_server_url}/{endpoint}" if query_string: url += "?" + query_string try: response = requests.get(url, timeout=5) response.encoding = 'utf-8' if response.ok: return response.text.splitlines() else: return [f"Error {response.status_code}: {response.text.strip()}"] except Exception as e: return [f"Request failed: {str(e)}"] def safe_post(endpoint: str, data: dict | str) -> str: try: if isinstance(data, dict): response = requests.post(f"{ghidra_server_url}/{endpoint}", data=data, timeout=5) else: response = requests.post(f"{ghidra_server_url}/{endpoint}", data=data.encode("utf-8"), timeout=5) response.encoding = 'utf-8' if response.ok: return response.text.strip() else: return f"Error {response.status_code}: {response.text.strip()}" except Exception as e: return f"Request failed: {str(e)}" @mcp.tool() def list_methods(offset: int = 0, limit: int = 100) -> list: """ List all function names in the program with pagination. """ return safe_get("methods", {"offset": offset, "limit": limit}) @mcp.tool() def list_classes(offset: int = 0, limit: int = 100) -> list: """ List all namespace/class names in the program with pagination. """ return safe_get("classes", {"offset": offset, "limit": limit}) @mcp.tool() def decompile_function(name: str) -> str: """ Decompile a specific function by name and return the decompiled C code. """ return safe_post("decompile", name) @mcp.tool() def rename_function(old_name: str, new_name: str) -> str: """ Rename a function by its current name to a new user-defined name. """ return safe_post("renameFunction", {"oldName": old_name, "newName": new_name}) @mcp.tool() def rename_data(address: str, new_name: str) -> str: """ Rename a data label at the specified address. """ return safe_post("renameData", {"address": address, "newName": new_name}) @mcp.tool() def list_segments(offset: int = 0, limit: int = 100) -> list: """ List all memory segments in the program with pagination. """ return safe_get("segments", {"offset": offset, "limit": limit}) @mcp.tool() def list_imports(offset: int = 0, limit: int = 100) -> list: """ List imported symbols in the program with pagination. """ return safe_get("imports", {"offset": offset, "limit": limit}) @mcp.tool() def list_exports(offset: int = 0, limit: int = 100) -> list: """ List exported functions/symbols with pagination. """ return safe_get("exports", {"offset": offset, "limit": limit}) @mcp.tool() def list_namespaces(offset: int = 0, limit: int = 100) -> list: """ List all non-global namespaces in the program with pagination. """ return safe_get("namespaces", {"offset": offset, "limit": limit}) @mcp.tool() def list_data_items(offset: int = 0, limit: int = 100) -> list: """ List defined data labels and their values with pagination. """ return safe_get("data", {"offset": offset, "limit": limit}) @mcp.tool() def search_functions_by_name(query: str, offset: int = 0, limit: int = 100) -> list: """ Search for functions whose name contains the given substring. """ if not query: return ["Error: query string is required"] return safe_get("searchFunctions", {"query": query, "offset": offset, "limit": limit}) if __name__ == "__main__": mcp.run()