diff --git a/bridge_mcp_hydra.py b/bridge_mcp_hydra.py index 2902111..bccb77b 100644 --- a/bridge_mcp_hydra.py +++ b/bridge_mcp_hydra.py @@ -13,8 +13,8 @@ import sys import threading import time from threading import Lock -from typing import Dict, List -from urllib.parse import quote +from typing import Dict, List, Optional, Union, Any +from urllib.parse import quote, urlencode from urllib.parse import urlparse import requests @@ -77,7 +77,9 @@ def validate_origin(headers: dict) -> bool: return origin_base in ALLOWED_ORIGINS -def _make_request(method: str, port: int, endpoint: str, params: dict = None, json_data: dict = None, data: str = None, headers: dict = None) -> dict: +def _make_request(method: str, port: int, endpoint: str, params: dict = None, + json_data: dict = None, data: str = None, + headers: dict = None) -> dict: """Internal helper to make HTTP requests and handle common errors.""" url = f"{get_instance_url(port)}/{endpoint}" request_headers = {'Accept': 'application/json'} @@ -113,6 +115,7 @@ def _make_request(method: str, port: int, endpoint: str, params: dict = None, js try: parsed_json = response.json() + # Add timestamp if not present if isinstance(parsed_json, dict) and "timestamp" not in parsed_json: parsed_json["timestamp"] = int(time.time() * 1000) return parsed_json @@ -168,7 +171,7 @@ def safe_put(port: int, endpoint: str, data: dict) -> dict: return _make_request("PUT", port, endpoint, json_data=data, headers=headers) -def safe_post(port: int, endpoint: str, data: dict | str) -> dict: +def safe_post(port: int, endpoint: str, data: Union[dict, str]) -> dict: """Perform a POST request to a specific Ghidra instance with JSON or text payload""" headers = None json_payload = None @@ -188,13 +191,83 @@ def safe_patch(port: int, endpoint: str, data: dict) -> dict: headers = data.pop("headers", None) if isinstance(data, dict) else None return _make_request("PATCH", port, endpoint, json_data=data, headers=headers) -# Instance management tools +def safe_delete(port: int, endpoint: str) -> dict: + """Perform a DELETE request to a specific Ghidra instance""" + return _make_request("DELETE", port, endpoint) + + +def simplify_response(response: dict) -> dict: + """ + Simplify HATEOAS response data for easier AI agent consumption + - Removes _links from result entries + - Flattens nested structures when appropriate + - Preserves important metadata + - Converts structured data like disassembly to text for easier consumption + """ + if not isinstance(response, dict): + return response + + # Make a copy to avoid modifying the original + result = response.copy() + + # Simplify the main result data if present + if "result" in result: + # Handle array results + if isinstance(result["result"], list): + simplified_items = [] + for item in result["result"]: + if isinstance(item, dict): + # Remove HATEOAS links from individual items + item_copy = item.copy() + item_copy.pop("_links", None) + simplified_items.append(item_copy) + else: + simplified_items.append(item) + result["result"] = simplified_items + + # Handle object results + elif isinstance(result["result"], dict): + result_copy = result["result"].copy() + # Remove links from result object + result_copy.pop("_links", None) + + # Special case for disassembly - convert to text for easier consumption + if "instructions" in result_copy and isinstance(result_copy["instructions"], list): + disasm_text = "" + for instr in result_copy["instructions"]: + if isinstance(instr, dict): + addr = instr.get("address", "") + mnemonic = instr.get("mnemonic", "") + operands = instr.get("operands", "") + bytes_str = instr.get("bytes", "") + + # Format: address: bytes mnemonic operands + disasm_text += f"{addr}: {bytes_str.ljust(10)} {mnemonic} {operands}\n" + + # Add the text representation while preserving the original structured data + result_copy["disassembly_text"] = disasm_text + + # Special case for decompiled code - make sure it's directly accessible + if "ccode" in result_copy: + result_copy["decompiled_text"] = result_copy["ccode"] + elif "decompiled" in result_copy: + result_copy["decompiled_text"] = result_copy["decompiled"] + + result["result"] = result_copy + + # Remove HATEOAS links from the top level + result.pop("_links", None) + + return result + + +# Instance management tools @mcp.tool() def list_instances() -> dict: """List all active Ghidra instances - + Returns: dict: Contains 'instances' list with port, url, project and file info for each instance """ @@ -215,11 +288,11 @@ def list_instances() -> dict: @mcp.tool() def register_instance(port: int, url: str = None) -> str: """Register a new Ghidra instance - + Args: port: Port number of the Ghidra instance url: Optional URL if different from default http://host:port - + Returns: str: Confirmation message or error """ @@ -227,59 +300,37 @@ def register_instance(port: int, url: str = None) -> str: url = f"http://{ghidra_host}:{port}" try: - test_url = f"{url}/instances" + # Check for HATEOAS API by checking plugin-version endpoint + test_url = f"{url}/plugin-version" response = requests.get(test_url, timeout=2) + if not response.ok: - return f"Error: Instance at {url} is not responding properly" + return f"Error: Instance at {url} is not responding properly to HATEOAS API" project_info = {"url": url} try: - root_url = f"{url}/" - root_response = requests.get( - root_url, timeout=1.5) # Short timeout for root - - if root_response.ok: - try: - root_data = root_response.json() - - if "project" in root_data and root_data["project"]: - project_info["project"] = root_data["project"] - if "file" in root_data and root_data["file"]: - project_info["file"] = root_data["file"] - - except Exception as e: - print(f"Error parsing root info: {e}", file=sys.stderr) - - if not project_info.get("project") and not project_info.get("file"): - info_url = f"{url}/info" - - try: - info_response = requests.get(info_url, timeout=2) - if info_response.ok: - try: - info_data = info_response.json() - if "project" in info_data and info_data["project"]: - project_info["project"] = info_data["project"] - - file_info = info_data.get("file", {}) - if isinstance(file_info, dict) and file_info.get("name"): - project_info["file"] = file_info.get( - "name", "") - project_info["path"] = file_info.get( - "path", "") - project_info["architecture"] = file_info.get( - "architecture", "") - project_info["endian"] = file_info.get( - "endian", "") - print( - f"Info data parsed: {project_info}", file=sys.stderr) - except Exception as e: - print( - f"Error parsing info endpoint: {e}", file=sys.stderr) - except Exception as e: - print( - f"Error connecting to info endpoint: {e}", file=sys.stderr) + # Get program info from HATEOAS API + info_url = f"{url}/program" + + try: + info_response = requests.get(info_url, timeout=2) + if info_response.ok: + try: + info_data = info_response.json() + if "result" in info_data: + result = info_data["result"] + if isinstance(result, dict): + project_info["project"] = result.get("project", "") + project_info["file"] = result.get("name", "") + project_info["path"] = result.get("path", "") + project_info["language_id"] = result.get("language_id", "") + project_info["compiler_spec_id"] = result.get("compiler_spec_id", "") + project_info["image_base"] = result.get("image_base", "") + except Exception as e: + print(f"Error parsing info endpoint: {e}", file=sys.stderr) + except Exception as e: + print(f"Error connecting to info endpoint: {e}", file=sys.stderr) except Exception: # Non-critical, continue with registration even if project info fails pass @@ -295,10 +346,10 @@ def register_instance(port: int, url: str = None) -> str: @mcp.tool() def unregister_instance(port: int) -> str: """Unregister a Ghidra instance - + Args: port: Port number of the instance to unregister - + Returns: str: Confirmation message or error """ @@ -312,10 +363,10 @@ def unregister_instance(port: int) -> str: @mcp.tool() def discover_instances(host: str = None) -> dict: """Discover available Ghidra instances by scanning ports - + Args: host: Optional host to scan (default: configured ghidra_host) - + Returns: dict: Contains 'found' count and 'instances' list with discovery results """ @@ -333,7 +384,8 @@ def _discover_instances(port_range, host=None, timeout=0.5) -> dict: url = f"http://{scan_host}:{port}" try: - test_url = f"{url}/instances" + # Try HATEOAS API via plugin-version endpoint + test_url = f"{url}/plugin-version" response = requests.get(test_url, timeout=timeout) if response.ok: result = register_instance(port, url) @@ -350,62 +402,32 @@ def _discover_instances(port_range, host=None, timeout=0.5) -> dict: @mcp.tool() -def list_programs(port: int = DEFAULT_GHIDRA_PORT, - offset: int = 0, - limit: int = 100, - project: str = None) -> dict: - """List all programs across all projects with filtering and pagination - +def get_plugin_version(port: int = DEFAULT_GHIDRA_PORT) -> dict: + """Get version information for the Ghidra plugin + Args: port: Ghidra instance port (default: 8192) - offset: Pagination offset (default: 0) - limit: Maximum items to return (default: 100) - project: Filter by project name - + Returns: - dict: { - "result": list of program info objects, - "size": total count, - "offset": current offset, - "limit": current limit, - "_links": pagination links - } + dict: Plugin and API version information """ - params = { - "offset": offset, - "limit": limit - } - if project: - params["project"] = project - - response = safe_get(port, "programs", params) - if isinstance(response, dict) and "error" in response: - return response - - return { - "result": response.get("result", []), - "size": response.get("size", len(response.get("result", []))), - "offset": offset, - "limit": limit, - "_links": response.get("_links", {}) - } + response = safe_get(port, "plugin-version") + return simplify_response(response) @mcp.tool() -def get_current_program(port: int = DEFAULT_GHIDRA_PORT) -> dict: - """Get details about the currently loaded program - +def get_program_info(port: int = DEFAULT_GHIDRA_PORT) -> dict: + """Get detailed information about the current program + Args: port: Ghidra instance port (default: 8192) - + Returns: - dict: Program information including name, ID, language, etc. + dict: Contains metadata about the current program including name, + architecture, memory layout, compiler, etc. """ - response = safe_get(port, "programs/current") - if isinstance(response, dict) and "error" in response: - return response - - return response + response = safe_get(port, "program") + return simplify_response(response) @mcp.tool() @@ -417,7 +439,7 @@ def list_functions(port: int = DEFAULT_GHIDRA_PORT, name_contains: str = None, name_matches_regex: str = None) -> dict: """List functions in the current program with filtering and pagination - + Args: port: Ghidra instance port (default: 8192) offset: Pagination offset (default: 0) @@ -426,14 +448,13 @@ def list_functions(port: int = DEFAULT_GHIDRA_PORT, name: Exact name match filter (case-sensitive) name_contains: Substring name filter (case-insensitive) name_matches_regex: Regex name filter - + Returns: dict: { "result": list of function info objects, "size": total count, "offset": current offset, - "limit": current limit, - "_links": pagination links + "limit": current limit } """ params = { @@ -449,107 +470,200 @@ def list_functions(port: int = DEFAULT_GHIDRA_PORT, if name_matches_regex: params["name_matches_regex"] = name_matches_regex - response = safe_get(port, "programs/current/functions", params) - if isinstance(response, dict) and "error" in response: - return response + response = safe_get(port, "functions", params) + simplified = simplify_response(response) + + # Ensure we maintain pagination metadata + if isinstance(simplified, dict) and "error" not in simplified: + simplified.setdefault("size", len(simplified.get("result", []))) + simplified.setdefault("offset", offset) + simplified.setdefault("limit", limit) + + return simplified - # Transform to expected format if needed - return { - "result": response.get("result", []), - "size": response.get("size", len(response.get("result", []))), - "offset": offset, - "limit": limit, - "_links": response.get("_links", {}) + +@mcp.tool() +def get_function(port: int = DEFAULT_GHIDRA_PORT, + address: str = None, + name: str = None) -> dict: + """Get details for a function by address or name + + Args: + port: Ghidra instance port (default: 8192) + address: Function address in hex format (mutually exclusive with name) + name: Function name (mutually exclusive with address) + + Returns: + dict: Contains function name, address, signature and other details + """ + if not address and not name: + return { + "success": False, + "error": "Either address or name parameter is required", + "timestamp": int(time.time() * 1000) + } + + if address: + endpoint = f"functions/{address}" + else: + endpoint = f"functions/by-name/{quote(name)}" + + response = safe_get(port, endpoint) + return simplify_response(response) + + +@mcp.tool() +def decompile_function(port: int = DEFAULT_GHIDRA_PORT, + address: str = None, + name: str = None, + syntax_tree: bool = False, + style: str = "normalize") -> dict: + """Get decompiled code for a function by address or name + + Args: + port: Ghidra instance port (default: 8192) + address: Function address in hex format (mutually exclusive with name) + name: Function name (mutually exclusive with address) + syntax_tree: Include syntax tree (default: False) + style: Decompiler style (default: "normalize") + + Returns: + dict: Contains function information and decompiled code + """ + if not address and not name: + return { + "success": False, + "error": "Either address or name parameter is required", + "timestamp": int(time.time() * 1000) + } + + params = { + "syntax_tree": str(syntax_tree).lower(), + "style": style } + + if address: + endpoint = f"functions/{address}/decompile" + else: + endpoint = f"functions/by-name/{quote(name)}/decompile" + + response = safe_get(port, endpoint, params) + simplified = simplify_response(response) + + # For AI consumption, make the decompiled code more directly accessible + if "result" in simplified and isinstance(simplified["result"], dict): + if "decompiled" in simplified["result"]: + simplified["decompiled_code"] = simplified["result"]["decompiled"] + elif "ccode" in simplified["result"]: + simplified["decompiled_code"] = simplified["result"]["ccode"] + elif "decompiled_text" in simplified["result"]: + simplified["decompiled_code"] = simplified["result"]["decompiled_text"] + + return simplified @mcp.tool() -def list_classes(port: int = DEFAULT_GHIDRA_PORT, offset: int = 0, limit: int = 100) -> list: - """List classes in the current program with pagination - +def disassemble_function(port: int = DEFAULT_GHIDRA_PORT, + address: str = None, + name: str = None) -> dict: + """Get disassembly for a function by address or name + Args: port: Ghidra instance port (default: 8192) - offset: Pagination offset (default: 0) - limit: Maximum items to return (default: 100) - + address: Function address in hex format (mutually exclusive with name) + name: Function name (mutually exclusive with address) + Returns: - list: Class names and info + dict: Contains function information and disassembly text """ - return safe_get(port, "classes", {"offset": offset, "limit": limit}) + if not address and not name: + return { + "success": False, + "error": "Either address or name parameter is required", + "timestamp": int(time.time() * 1000) + } + + if address: + endpoint = f"functions/{address}/disassembly" + else: + endpoint = f"functions/by-name/{quote(name)}/disassembly" + + response = safe_get(port, endpoint) + simplified = simplify_response(response) + + # For AI consumption, add a plain text version of the disassembly if not already present + if "result" in simplified and isinstance(simplified["result"], dict): + if "instructions" in simplified["result"] and isinstance(simplified["result"]["instructions"], list): + if "disassembly_text" not in simplified["result"]: + instr_list = simplified["result"]["instructions"] + disasm_text = "" + for instr in instr_list: + if isinstance(instr, dict): + addr = instr.get("address", "") + mnemonic = instr.get("mnemonic", "") + operands = instr.get("operands", "") + bytes_str = instr.get("bytes", "") + + # Format: address: bytes mnemonic operands + disasm_text += f"{addr}: {bytes_str.ljust(10)} {mnemonic} {operands}\n" + + simplified["result"]["disassembly_text"] = disasm_text + # Also make it more directly accessible + simplified["disassembly_text"] = disasm_text + + return simplified @mcp.tool() -def get_function(port: int = DEFAULT_GHIDRA_PORT, name: str = "", cCode: bool = True, syntaxTree: bool = False, simplificationStyle: str = "normalize") -> dict: - """Get decompiled code for a function - +def get_function_variables(port: int = DEFAULT_GHIDRA_PORT, + address: str = None, + name: str = None) -> dict: + """Get variables for a function by address or name + Args: port: Ghidra instance port (default: 8192) - name: Function name to decompile - cCode: Return C-style code (default: True) - syntaxTree: Include syntax tree (default: False) - simplificationStyle: Decompiler style (default: "normalize") - + address: Function address in hex format (mutually exclusive with name) + name: Function name (mutually exclusive with address) + Returns: - dict: Contains function name, address, signature and decompilation + dict: Contains function information and list of variables """ - return safe_get(port, f"functions/{quote(name)}", { - "cCode": str(cCode).lower(), - "syntaxTree": str(syntaxTree).lower(), - "simplificationStyle": simplificationStyle - }) - - -@mcp.tool() -def update_function(port: int = DEFAULT_GHIDRA_PORT, name: str = "", new_name: str = "") -> str: - """Rename a function - - Args: - port: Ghidra instance port (default: 8192) - name: Current function name - new_name: New function name - - Returns: - str: Confirmation message or error - """ - return safe_post(port, f"functions/{quote(name)}", {"newName": new_name}) - - -@mcp.tool() -def update_data(port: int = DEFAULT_GHIDRA_PORT, address: str = "", new_name: str = "") -> str: - """Rename data at a memory address - - Args: - port: Ghidra instance port (default: 8192) - address: Memory address in hex format - new_name: New name for the data - - Returns: - str: Confirmation message or error - """ - return safe_post(port, "data", {"address": address, "newName": new_name}) + if not address and not name: + return { + "success": False, + "error": "Either address or name parameter is required", + "timestamp": int(time.time() * 1000) + } + + if address: + endpoint = f"functions/{address}/variables" + else: + endpoint = f"functions/by-name/{quote(name)}/variables" + + response = safe_get(port, endpoint) + return simplify_response(response) @mcp.tool() def list_segments(port: int = DEFAULT_GHIDRA_PORT, - offset: int = 0, - limit: int = 100, - name: str = None) -> dict: + offset: int = 0, + limit: int = 100, + name: str = None) -> dict: """List memory segments with filtering and pagination - + Args: port: Ghidra instance port (default: 8192) offset: Pagination offset (default: 0) limit: Maximum items to return (default: 100) name: Filter by segment name (case-sensitive substring match) - + Returns: dict: { "result": list of segment objects with properties including name, start, end, size, permissions (readable, writable, executable), and initialized status, "size": total count of segments matching the filter, "offset": current offset in pagination, - "limit": current limit for pagination, - "_links": pagination links for HATEOAS navigation + "limit": current limit for pagination } """ params = { @@ -559,29 +673,28 @@ def list_segments(port: int = DEFAULT_GHIDRA_PORT, if name: params["name"] = name - response = safe_get(port, "programs/current/segments", params) - if isinstance(response, dict) and "error" in response: - return response - - return { - "result": response.get("result", []), - "size": response.get("size", len(response.get("result", []))), - "offset": offset, - "limit": limit, - "_links": response.get("_links", {}) - } + response = safe_get(port, "segments", params) + simplified = simplify_response(response) + + # Ensure we maintain pagination metadata + if isinstance(simplified, dict) and "error" not in simplified: + simplified.setdefault("size", len(simplified.get("result", []))) + simplified.setdefault("offset", offset) + simplified.setdefault("limit", limit) + + return simplified @mcp.tool() def list_symbols(port: int = DEFAULT_GHIDRA_PORT, - offset: int = 0, - limit: int = 100, - addr: str = None, - name: str = None, - name_contains: str = None, - type: str = None) -> dict: + offset: int = 0, + limit: int = 100, + addr: str = None, + name: str = None, + name_contains: str = None, + type: str = None) -> dict: """List symbols with filtering and pagination - + Args: port: Ghidra instance port (default: 8192) offset: Pagination offset (default: 0) @@ -590,14 +703,13 @@ def list_symbols(port: int = DEFAULT_GHIDRA_PORT, name: Exact name match filter (case-sensitive) name_contains: Substring name filter (case-insensitive) type: Filter by symbol type (e.g. "function", "data", "label") - + Returns: dict: { "result": list of symbol objects, "size": total count, "offset": current offset, - "limit": current limit, - "_links": pagination links + "limit": current limit } """ params = { @@ -613,108 +725,28 @@ def list_symbols(port: int = DEFAULT_GHIDRA_PORT, if type: params["type"] = type - response = safe_get(port, "programs/current/symbols", params) - if isinstance(response, dict) and "error" in response: - return response - - return { - "result": response.get("result", []), - "size": response.get("size", len(response.get("result", []))), - "offset": offset, - "limit": limit, - "_links": response.get("_links", {}) - } - - -@mcp.tool() -def list_imports(port: int = DEFAULT_GHIDRA_PORT, offset: int = 0, limit: int = 100) -> dict: - """List imported symbols with pagination - - Args: - port: Ghidra instance port (default: 8192) - offset: Pagination offset (default: 0) - limit: Maximum items to return (default: 100) - - Returns: - dict: { - "result": list of imported symbols, - "size": total count, - "offset": current offset, - "limit": current limit, - "_links": pagination links - } - """ - response = safe_get(port, "programs/current/symbols/imports", - {"offset": offset, "limit": limit}) - if isinstance(response, dict) and "error" in response: - return response - - return { - "result": response.get("result", []), - "size": response.get("size", len(response.get("result", []))), - "offset": offset, - "limit": limit, - "_links": response.get("_links", {}) - } - - -@mcp.tool() -def list_exports(port: int = DEFAULT_GHIDRA_PORT, offset: int = 0, limit: int = 100) -> dict: - """List exported symbols with pagination - - Args: - port: Ghidra instance port (default: 8192) - offset: Pagination offset (default: 0) - limit: Maximum items to return (default: 100) - - Returns: - dict: { - "result": list of exported symbols, - "size": total count, - "offset": current offset, - "limit": current limit, - "_links": pagination links - } - """ - response = safe_get(port, "programs/current/symbols/exports", - {"offset": offset, "limit": limit}) - if isinstance(response, dict) and "error" in response: - return response - - return { - "result": response.get("result", []), - "size": response.get("size", len(response.get("result", []))), - "offset": offset, - "limit": limit, - "_links": response.get("_links", {}) - } - - -@mcp.tool() -def list_namespaces(port: int = DEFAULT_GHIDRA_PORT, offset: int = 0, limit: int = 100) -> list: - """List namespaces with pagination - - Args: - port: Ghidra instance port (default: 8192) - offset: Pagination offset (default: 0) - limit: Maximum items to return (default: 100) - - Returns: - list: Namespace information strings - """ - return safe_get(port, "namespaces", {"offset": offset, "limit": limit}) + response = safe_get(port, "symbols", params) + simplified = simplify_response(response) + + # Ensure we maintain pagination metadata + if isinstance(simplified, dict) and "error" not in simplified: + simplified.setdefault("size", len(simplified.get("result", []))) + simplified.setdefault("offset", offset) + simplified.setdefault("limit", limit) + + return simplified @mcp.tool() def list_data_items(port: int = DEFAULT_GHIDRA_PORT, - offset: int = 0, - limit: int = 100, - addr: str = None, - name: str = None, - name_contains: str = None, - type: str = None) -> dict: + offset: int = 0, + limit: int = 100, + addr: str = None, + name: str = None, + name_contains: str = None, + type: str = None) -> dict: """List defined data items with filtering and pagination - + Args: port: Ghidra instance port (default: 8192) offset: Pagination offset (default: 0) @@ -723,14 +755,13 @@ def list_data_items(port: int = DEFAULT_GHIDRA_PORT, name: Exact name match filter (case-sensitive) name_contains: Substring name filter (case-insensitive) type: Filter by data type (e.g. "string", "dword") - + Returns: dict: { "result": list of data item objects, "size": total count, "offset": current offset, - "limit": current limit, - "_links": pagination links + "limit": current limit } """ params = { @@ -746,50 +777,31 @@ def list_data_items(port: int = DEFAULT_GHIDRA_PORT, if type: params["type"] = type - response = safe_get(port, "programs/current/data", params) - if isinstance(response, dict) and "error" in response: - return response - - return { - "result": response.get("result", []), - "size": response.get("size", len(response.get("result", []))), - "offset": offset, - "limit": limit, - "_links": response.get("_links", {}) - } - - -@mcp.tool() -def search_functions_by_name(port: int = DEFAULT_GHIDRA_PORT, query: str = "", offset: int = 0, limit: int = 100) -> list: - """Search functions by name with pagination - - Args: - port: Ghidra instance port (default: 8192) - query: Search string for function names - offset: Pagination offset (default: 0) - limit: Maximum items to return (default: 100) - - Returns: - list: Matching function info or error if query empty - """ - if not query: - return ["Error: query string is required"] - return safe_get(port, "functions", {"query": query, "offset": offset, "limit": limit}) + response = safe_get(port, "data", params) + simplified = simplify_response(response) + + # Ensure we maintain pagination metadata + if isinstance(simplified, dict) and "error" not in simplified: + simplified.setdefault("size", len(simplified.get("result", []))) + simplified.setdefault("offset", offset) + simplified.setdefault("limit", limit) + + return simplified @mcp.tool() def read_memory(port: int = DEFAULT_GHIDRA_PORT, - address: str = "", - length: int = 16, - format: str = "hex") -> dict: + address: str = "", + length: int = 16, + format: str = "hex") -> dict: """Read bytes from memory - + Args: port: Ghidra instance port (default: 8192) address: Memory address in hex format length: Number of bytes to read (default: 16) format: Output format - "hex", "base64", or "string" (default: "hex") - + Returns: dict: { "address": original address, @@ -806,65 +818,184 @@ def read_memory(port: int = DEFAULT_GHIDRA_PORT, "timestamp": int(time.time() * 1000) } - response = safe_get(port, f"programs/current/memory/{address}", { + params = { "length": length, "format": format - }) - - if isinstance(response, dict) and "error" in response: - return response - - return { - "address": address, - "length": length, - "format": format, - "bytes": response.get("result", {}).get("bytes", ""), - "timestamp": response.get("timestamp", int(time.time() * 1000)) } + + response = safe_get(port, f"memory/{address}", params) + simplified = simplify_response(response) + + # Ensure the result is simple and directly usable + if "result" in simplified and isinstance(simplified["result"], dict): + bytes_data = simplified["result"].get("bytes", "") + memory_info = { + "address": address, + "length": length, + "format": format, + "bytes": bytes_data, + "timestamp": simplified.get("timestamp", int(time.time() * 1000)) + } + return memory_info + + return simplified @mcp.tool() def write_memory(port: int = DEFAULT_GHIDRA_PORT, - address: str = "", - bytes: str = "", - format: str = "hex") -> dict: + address: str = "", + bytes_data: str = "", + format: str = "hex") -> dict: """Write bytes to memory (use with caution) - + Args: port: Ghidra instance port (default: 8192) address: Memory address in hex format - bytes: Data to write (format depends on 'format' parameter) + bytes_data: Data to write (format depends on 'format' parameter) format: Input format - "hex", "base64", or "string" (default: "hex") - + Returns: dict: Operation result with success status containing: - address: the target memory address - length: number of bytes written - bytesWritten: confirmation of bytes written """ - if not address or not bytes: + if not address or not bytes_data: return { "success": False, "error": "Address and bytes parameters are required", "timestamp": int(time.time() * 1000) } - return safe_post(port, f"programs/current/memory/{address}", { - "bytes": bytes, + payload = { + "bytes": bytes_data, "format": format - }) + } + + response = safe_patch(port, f"memory/{address}", payload) + return simplify_response(response) @mcp.tool() -def get_function_by_address(port: int = DEFAULT_GHIDRA_PORT, address: str = "") -> dict: - """Get function details by memory address - +def list_xrefs(port: int = DEFAULT_GHIDRA_PORT, + to_addr: str = None, + from_addr: str = None, + type: str = None, + offset: int = 0, + limit: int = 100) -> dict: + """List cross-references with filtering and pagination + Args: port: Ghidra instance port (default: 8192) - address: Memory address in hex format - + to_addr: Filter references to this address (hexadecimal) + from_addr: Filter references from this address (hexadecimal) + type: Filter by reference type (e.g. "CALL", "READ", "WRITE") + offset: Pagination offset (default: 0) + limit: Maximum items to return (default: 100) + Returns: - dict: Contains function name, address, signature and decompilation + dict: { + "result": list of xref objects with from_addr, to_addr, type, from_function, to_function fields, + "size": total number of xrefs matching the filter, + "offset": current offset for pagination, + "limit": current limit for pagination + } + """ + params = { + "offset": offset, + "limit": limit + } + if to_addr: + params["to_addr"] = to_addr + if from_addr: + params["from_addr"] = from_addr + if type: + params["type"] = type + + response = safe_get(port, "xrefs", params) + simplified = simplify_response(response) + + # Ensure we maintain pagination metadata + if isinstance(simplified, dict) and "error" not in simplified: + simplified.setdefault("size", len(simplified.get("result", []))) + simplified.setdefault("offset", offset) + simplified.setdefault("limit", limit) + + # For AI consumption, make the references more directly accessible + if "result" in simplified and isinstance(simplified["result"], dict) and "references" in simplified["result"]: + simplified["xrefs"] = simplified["result"]["references"] + + return simplified + + +@mcp.tool() +def get_current_address(port: int = DEFAULT_GHIDRA_PORT) -> dict: + """Get the address currently selected in Ghidra's UI + + Args: + port: Ghidra instance port (default: 8192) + + Returns: + Dict containing: + - success: boolean indicating success + - result: object with address field + - error: error message if failed + - timestamp: timestamp of response + """ + response = safe_get(port, "address") + return simplify_response(response) + + +@mcp.tool() +def get_current_function(port: int = DEFAULT_GHIDRA_PORT) -> dict: + """Get the function currently selected in Ghidra's UI + + Args: + port: Ghidra instance port (default: 8192) + + Returns: + Dict containing: + - success: boolean indicating success + - result: object with name, address and signature fields + - error: error message if failed + - timestamp: timestamp of response + """ + response = safe_get(port, "function") + return simplify_response(response) + + +@mcp.tool() +def analyze_program(port: int = DEFAULT_GHIDRA_PORT, + analysis_options: dict = None) -> dict: + """Run analysis on the current program + + Args: + port: Ghidra instance port (default: 8192) + analysis_options: Dictionary of analysis options to enable/disable + (e.g. {"functionRecovery": True, "dataRefs": False}) + None means use default analysis options + + Returns: + dict: Analysis operation result with status containing: + - program: program name + - analysis_triggered: boolean indicating if analysis was successfully started + - message: status message + """ + response = safe_post(port, "analysis", analysis_options or {}) + return simplify_response(response) + + +@mcp.tool() +def create_function(port: int = DEFAULT_GHIDRA_PORT, + address: str = "") -> dict: + """Create a new function at the specified address + + Args: + port: Ghidra instance port (default: 8192) + address: Memory address in hex format where function starts + + Returns: + dict: Operation result with the created function information """ if not address: return { @@ -873,178 +1004,239 @@ def get_function_by_address(port: int = DEFAULT_GHIDRA_PORT, address: str = "") "timestamp": int(time.time() * 1000) } - # Use the HATEOAS endpoint - response = safe_get(port, f"programs/current/functions/{address}") + payload = { + "address": address + } - # Format the response for consistency - if isinstance(response, dict) and "success" in response and response["success"]: - # Add timestamp if not present - if "timestamp" not in response: - response["timestamp"] = int(time.time() * 1000) - # Add port for tracking - response["port"] = port - - return response + response = safe_post(port, "functions", payload) + return simplify_response(response) @mcp.tool() -def get_current_address(port: int = DEFAULT_GHIDRA_PORT) -> dict: - """Get the address currently selected in Ghidra's UI - +def rename_function(port: int = DEFAULT_GHIDRA_PORT, + address: str = None, + name: str = None, + new_name: str = "") -> dict: + """Rename a function + Args: port: Ghidra instance port (default: 8192) - - Returns: - Dict containing: - - success: boolean indicating success - - result: object with address field - - error: error message if failed - - timestamp: timestamp of response - """ - # Use ONLY the new HATEOAS endpoint - response = safe_get(port, "programs/current/address") - if isinstance(response, dict) and "success" in response: - return response + address: Function address in hex format (mutually exclusive with name) + name: Current function name (mutually exclusive with address) + new_name: New function name - return { - "success": False, - "error": "Failed to get current address", - "timestamp": int(time.time() * 1000), - "port": port + Returns: + dict: Operation result with the updated function information + """ + if not (address or name) or not new_name: + return { + "success": False, + "error": "Either address or name, and new_name parameters are required", + "timestamp": int(time.time() * 1000) + } + + payload = { + "name": new_name } + + if address: + endpoint = f"functions/{address}" + else: + endpoint = f"functions/by-name/{quote(name)}" + + response = safe_patch(port, endpoint, payload) + return simplify_response(response) @mcp.tool() -def list_xrefs(port: int = DEFAULT_GHIDRA_PORT, - to_addr: str = None, - from_addr: str = None, - type: str = None, - offset: int = 0, - limit: int = 100) -> dict: - """List cross-references with filtering and pagination - +def set_function_signature(port: int = DEFAULT_GHIDRA_PORT, + address: str = None, + name: str = None, + signature: str = "") -> dict: + """Set function signature/prototype + + Args: + port: Ghidra instance port (default: 8192) + address: Function address in hex format (mutually exclusive with name) + name: Function name (mutually exclusive with address) + signature: New function signature (e.g., "int func(char *data, int size)") + + Returns: + dict: Operation result with the updated function information + """ + if not (address or name) or not signature: + return { + "success": False, + "error": "Either address or name, and signature parameters are required", + "timestamp": int(time.time() * 1000) + } + + payload = { + "signature": signature + } + + if address: + endpoint = f"functions/{address}" + else: + endpoint = f"functions/by-name/{quote(name)}" + + response = safe_patch(port, endpoint, payload) + return simplify_response(response) + + +@mcp.tool() +def rename_variable(port: int = DEFAULT_GHIDRA_PORT, + function_address: str = None, + function_name: str = None, + variable_name: str = "", + new_name: str = "") -> dict: + """Rename a variable in a function + + Args: + port: Ghidra instance port (default: 8192) + function_address: Function address in hex format (mutually exclusive with function_name) + function_name: Function name (mutually exclusive with function_address) + variable_name: Current variable name + new_name: New variable name + + Returns: + dict: Operation result with the updated variable information + """ + if not (function_address or function_name) or not variable_name or not new_name: + return { + "success": False, + "error": "Function identifier (address or name), variable_name, and new_name parameters are required", + "timestamp": int(time.time() * 1000) + } + + payload = { + "name": new_name + } + + if function_address: + endpoint = f"functions/{function_address}/variables/{variable_name}" + else: + endpoint = f"functions/by-name/{quote(function_name)}/variables/{variable_name}" + + response = safe_patch(port, endpoint, payload) + return simplify_response(response) + + +@mcp.tool() +def set_variable_type(port: int = DEFAULT_GHIDRA_PORT, + function_address: str = None, + function_name: str = None, + variable_name: str = "", + data_type: str = "") -> dict: + """Change the data type of a variable in a function + + Args: + port: Ghidra instance port (default: 8192) + function_address: Function address in hex format (mutually exclusive with function_name) + function_name: Function name (mutually exclusive with function_address) + variable_name: Variable name + data_type: New data type (e.g. "int", "char *") + + Returns: + dict: Operation result with the updated variable information + """ + if not (function_address or function_name) or not variable_name or not data_type: + return { + "success": False, + "error": "Function identifier (address or name), variable_name, and data_type parameters are required", + "timestamp": int(time.time() * 1000) + } + + payload = { + "data_type": data_type + } + + if function_address: + endpoint = f"functions/{function_address}/variables/{variable_name}" + else: + endpoint = f"functions/by-name/{quote(function_name)}/variables/{variable_name}" + + response = safe_patch(port, endpoint, payload) + return simplify_response(response) + + +@mcp.tool() +def create_data(port: int = DEFAULT_GHIDRA_PORT, + address: str = "", + data_type: str = "", + size: int = None) -> dict: + """Define a new data item at the specified address + + Args: + port: Ghidra instance port (default: 8192) + address: Memory address in hex format + data_type: Data type (e.g. "string", "dword", "byte") + size: Optional size in bytes for the data item + + Returns: + dict: Operation result with the created data information + """ + if not address or not data_type: + return { + "success": False, + "error": "Address and data_type parameters are required", + "timestamp": int(time.time() * 1000) + } + + payload = { + "address": address, + "type": data_type + } + + if size is not None: + payload["size"] = size + + response = safe_post(port, "data", payload) + return simplify_response(response) + + +@mcp.tool() +def list_namespaces(port: int = DEFAULT_GHIDRA_PORT, + offset: int = 0, + limit: int = 100) -> dict: + """List namespaces with pagination + Args: port: Ghidra instance port (default: 8192) - to_addr: Filter references to this address (hexadecimal) - from_addr: Filter references from this address (hexadecimal) - type: Filter by reference type (e.g. "CALL", "READ", "WRITE") offset: Pagination offset (default: 0) limit: Maximum items to return (default: 100) - + Returns: - dict: { - "result": list of xref objects, - "size": total count, - "offset": current offset, - "limit": current limit, - "_links": pagination links - } + dict: Contains list of namespaces with pagination information """ params = { "offset": offset, "limit": limit } - if to_addr: - params["to_addr"] = to_addr - if from_addr: - params["from_addr"] = from_addr - if type: - params["type"] = type - - response = safe_get(port, "programs/current/xrefs", params) - if isinstance(response, dict) and "error" in response: - return response - - return { - "result": response.get("result", []), - "size": response.get("size", len(response.get("result", []))), - "offset": offset, - "limit": limit, - "_links": response.get("_links", {}) - } - - -@mcp.tool() -def list_xrefs(port: int = DEFAULT_GHIDRA_PORT, - to_addr: str = None, - from_addr: str = None, - type: str = None, - offset: int = 0, - limit: int = 100) -> dict: - """List cross-references with filtering and pagination - - Args: - port: Ghidra instance port (default: 8192) - to_addr: Filter references to this address (hexadecimal) - from_addr: Filter references from this address (hexadecimal) - type: Filter by reference type (e.g. "CALL", "READ", "WRITE") - offset: Pagination offset (default: 0) - limit: Maximum items to return (default: 100) - - Returns: - dict: { - "result": list of xref objects with from_addr, to_addr, type, from_function, to_function fields, - "size": total number of xrefs matching the filter, - "offset": current offset for pagination, - "limit": current limit for pagination, - "_links": pagination links for HATEOAS navigation - } - """ - params = { - "offset": offset, - "limit": limit - } - if to_addr: - params["to_addr"] = to_addr - if from_addr: - params["from_addr"] = from_addr - if type: - params["type"] = type - - response = safe_get(port, "programs/current/xrefs", params) - if isinstance(response, dict) and "error" in response: - return response - - return { - "result": response.get("result", []), - "size": response.get("size", len(response.get("result", []))), - "offset": offset, - "limit": limit, - "_links": response.get("_links", {}) - } - - -@mcp.tool() -def analyze_program(port: int = DEFAULT_GHIDRA_PORT, - analysis_options: dict = None) -> dict: - """Run analysis on the current program - - Args: - port: Ghidra instance port (default: 8192) - analysis_options: Dictionary of analysis options to enable/disable - (e.g. {"functionRecovery": True, "dataRefs": False}) - None means use default analysis options - - Returns: - dict: Analysis operation result with status containing: - - program: program name - - analysis_triggered: boolean indicating if analysis was successfully started - - message: status message - """ - return safe_post(port, "programs/current/analysis", analysis_options or {}) + + response = safe_get(port, "namespaces", params) + simplified = simplify_response(response) + + # Ensure we maintain pagination metadata + if isinstance(simplified, dict) and "error" not in simplified: + simplified.setdefault("size", len(simplified.get("result", []))) + simplified.setdefault("offset", offset) + simplified.setdefault("limit", limit) + + return simplified @mcp.tool() def get_callgraph(port: int = DEFAULT_GHIDRA_PORT, - function: str = None, - max_depth: int = 3) -> dict: + function: str = None, + max_depth: int = 3) -> dict: """Get function call graph visualization data - + Args: port: Ghidra instance port (default: 8192) - function: Starting function name (None starts from entry point) + function: Starting function name or address (None starts from entry point) max_depth: Maximum call depth to analyze (default: 3) - + Returns: dict: Graph data with: - root: name of the starting function @@ -1056,380 +1248,79 @@ def get_callgraph(port: int = DEFAULT_GHIDRA_PORT, params = {"max_depth": max_depth} if function: params["function"] = function - - return safe_get(port, "programs/current/analysis/callgraph", params) - - + + response = safe_get(port, "analysis/callgraph", params) + return simplify_response(response) @mcp.tool() def get_dataflow(port: int = DEFAULT_GHIDRA_PORT, - address: str = "", - direction: str = "forward", - max_steps: int = 50) -> dict: + address: str = "", + direction: str = "forward", + max_steps: int = 50) -> dict: """Perform data flow analysis from an address - + Args: port: Ghidra instance port (default: 8192) address: Starting address in hex format direction: "forward" or "backward" (default: "forward") max_steps: Maximum analysis steps (default: 50) - + Returns: dict: Data flow analysis results """ - return safe_get(port, "programs/current/analysis/dataflow", { + if not address: + return { + "success": False, + "error": "Address parameter is required", + "timestamp": int(time.time() * 1000) + } + + params = { "address": address, "direction": direction, "max_steps": max_steps - }) + } + + response = safe_get(port, "analysis/dataflow", params) + return simplify_response(response) @mcp.tool() -def get_current_function(port: int = DEFAULT_GHIDRA_PORT) -> dict: - """Get the function currently selected in Ghidra's UI - +def set_comment(port: int = DEFAULT_GHIDRA_PORT, + address: str = "", + comment: str = "", + comment_type: str = "plate") -> dict: + """Set a comment at the specified address + Args: port: Ghidra instance port (default: 8192) - - Returns: - Dict containing: - - success: boolean indicating success - - result: object with name, address and signature fields - - error: error message if failed - - timestamp: timestamp of response - """ - # Use ONLY the new HATEOAS endpoint - response = safe_get(port, "programs/current/function") - if isinstance(response, dict) and "success" in response: - return response + address: Memory address in hex format + comment: Comment text + comment_type: Type of comment - + "plate" (disassembly), + "pre" (pre-function), + "post" (post-function), + "eol" (end of line), + "repeatable" (shows each time referenced) + (default: "plate") - return { - "success": False, - "error": "Failed to get current function", - "timestamp": int(time.time() * 1000), - "port": port - } - - -@mcp.tool() -def decompile_function_by_address(port: int = DEFAULT_GHIDRA_PORT, address: str = "", cCode: bool = True, syntaxTree: bool = False, simplificationStyle: str = "normalize") -> dict: - """Decompile function at memory address - - Args: - port: Ghidra instance port (default: 8192) - address: Memory address in hex format - cCode: Return C-style code (default: True) - syntaxTree: Include syntax tree (default: False) - simplificationStyle: Decompiler style (default: "normalize") - - Returns: - dict: Contains decompiled code and function information - """ - if not address: - return { - "success": False, - "error": "Address parameter is required", - "timestamp": int(time.time() * 1000) - } - - # Use the HATEOAS endpoint - params = { - "syntax_tree": str(syntaxTree).lower(), - "style": simplificationStyle - } - - response = safe_get(port, f"programs/current/functions/{address}/decompile", params) - - # Format the response for consistency - if isinstance(response, dict) and "success" in response and response["success"]: - # Add timestamp if not present - if "timestamp" not in response: - response["timestamp"] = int(time.time() * 1000) - # Add port for tracking - response["port"] = port - - # Ensure the result has a decompilation field for backward compatibility - if "result" in response and isinstance(response["result"], dict): - if "ccode" in response["result"] and "decompilation" not in response["result"]: - response["result"]["decompilation"] = response["result"]["ccode"] - - return response - - -@mcp.tool() -def disassemble_function(port: int = DEFAULT_GHIDRA_PORT, address: str = "") -> dict: - """Get disassembly for function at address - - Args: - port: Ghidra instance port (default: 8192) - address: Memory address in hex format - - Returns: - dict: Contains assembly instructions with addresses and comments - """ - if not address: - return { - "success": False, - "error": "Address parameter is required", - "timestamp": int(time.time() * 1000) - } - - # Use the HATEOAS endpoint - response = safe_get(port, f"programs/current/functions/{address}/disassembly") - - # Format the response for consistency - if isinstance(response, dict) and "success" in response and response["success"]: - # Add timestamp if not present - if "timestamp" not in response: - response["timestamp"] = int(time.time() * 1000) - # Add port for tracking - response["port"] = port - - return response - - -@mcp.tool() -def set_decompiler_comment(port: int = DEFAULT_GHIDRA_PORT, address: str = "", comment: str = "") -> dict: - """Add/edit decompiler comment at address - - Args: - port: Ghidra instance port (default: 8192) - address: Memory address in hex format - comment: Comment text to add - - Returns: - dict: Operation result with success status - """ - if not address: - return { - "success": False, - "error": "Address parameter is required", - "timestamp": int(time.time() * 1000) - } - - # Use the HATEOAS endpoint - return safe_post(port, f"programs/current/memory/{address}/comments/decompiler", { - "comment": comment - }) - - -@mcp.tool() -def set_disassembly_comment(port: int = DEFAULT_GHIDRA_PORT, address: str = "", comment: str = "") -> dict: - """Add/edit disassembly comment at address - - Args: - port: Ghidra instance port (default: 8192) - address: Memory address in hex format - comment: Comment text to add - - Returns: - dict: Operation result with success status - """ - if not address: - return { - "success": False, - "error": "Address parameter is required", - "timestamp": int(time.time() * 1000) - } - - # Use the HATEOAS endpoint - return safe_post(port, f"programs/current/memory/{address}/comments/plate", { - "comment": comment - }) - - -@mcp.tool() -def rename_local_variable(port: int = DEFAULT_GHIDRA_PORT, function_address: str = "", old_name: str = "", new_name: str = "") -> dict: - """Rename local variable in function - - Args: - port: Ghidra instance port (default: 8192) - function_address: Function memory address in hex - old_name: Current variable name - new_name: New variable name - - Returns: - dict: Operation result with success status - """ - if not function_address or not old_name or not new_name: - return { - "success": False, - "error": "Function address, old name, and new name parameters are required", - "timestamp": int(time.time() * 1000) - } - - # Use the HATEOAS endpoint - return safe_patch(port, f"programs/current/functions/{function_address}/variables/{old_name}", { - "name": new_name - }) - - -@mcp.tool() -def rename_function_by_address(port: int = DEFAULT_GHIDRA_PORT, function_address: str = "", new_name: str = "") -> dict: - """Rename function at memory address - - Args: - port: Ghidra instance port (default: 8192) - function_address: Function memory address in hex - new_name: New function name - - Returns: - dict: Operation result with success status - """ - if not function_address or not new_name: - return { - "success": False, - "error": "Function address and new name parameters are required", - "timestamp": int(time.time() * 1000) - } - - # Use the HATEOAS endpoint - return safe_patch(port, f"programs/current/functions/{function_address}", { - "name": new_name - }) - - -@mcp.tool() -def set_function_prototype(port: int = DEFAULT_GHIDRA_PORT, function_address: str = "", prototype: str = "") -> dict: - """Update function signature/prototype - - Args: - port: Ghidra instance port (default: 8192) - function_address: Function memory address in hex - prototype: New prototype string (e.g. "int func(int param1)") - - Returns: - dict: Operation result with success status - """ - if not function_address or not prototype: - return { - "success": False, - "error": "Function address and prototype parameters are required", - "timestamp": int(time.time() * 1000) - } - - # Use the HATEOAS endpoint - return safe_patch(port, f"programs/current/functions/{function_address}", { - "signature": prototype - }) - - -@mcp.tool() -def set_local_variable_type(port: int = DEFAULT_GHIDRA_PORT, function_address: str = "", variable_name: str = "", new_type: str = "") -> dict: - """Change local variable data type - - Args: - port: Ghidra instance port (default: 8192) - function_address: Function memory address in hex - variable_name: Variable name to modify - new_type: New data type (e.g. "int", "char*") - - Returns: - dict: Operation result with success status - """ - if not function_address or not variable_name or not new_type: - return { - "success": False, - "error": "Function address, variable name, and new type parameters are required", - "timestamp": int(time.time() * 1000) - } - - # Use the HATEOAS endpoint - return safe_patch(port, f"programs/current/functions/{function_address}/variables/{variable_name}", { - "data_type": new_type - }) - - -@mcp.tool() -def list_variables(port: int = DEFAULT_GHIDRA_PORT, offset: int = 0, limit: int = 100, search: str = "") -> dict: - """List global variables with optional search - - Args: - port: Ghidra instance port (default: 8192) - offset: Pagination offset (default: 0) - limit: Maximum items to return (default: 100) - search: Optional filter for variable names - - Returns: - dict: Contains variables list in 'result' field with pagination info - """ - params = {"offset": offset, "limit": limit} - if search: - params["name_contains"] = search - - # Use the HATEOAS endpoint - response = safe_get(port, "programs/current/variables", params) - if isinstance(response, dict) and "error" in response: - return response - - return { - "result": response.get("result", []), - "size": response.get("size", len(response.get("result", []))), - "offset": offset, - "limit": limit, - "_links": response.get("_links", {}) - } - - -@mcp.tool() -def list_function_variables(port: int = DEFAULT_GHIDRA_PORT, function: str = "") -> dict: - """List variables in function - - Args: - port: Ghidra instance port (default: 8192) - function: Function name to list variables for - - Returns: - dict: Contains variables list in 'result.variables' - """ - if not function: - return {"success": False, "error": "Function name is required"} - - encoded_name = quote(function) - return safe_get(port, f"functions/{encoded_name}/variables", {}) - - -@mcp.tool() -def rename_variable(port: int = DEFAULT_GHIDRA_PORT, function: str = "", name: str = "", new_name: str = "") -> dict: - """Rename variable in function - - Args: - port: Ghidra instance port (default: 8192) - function: Function name containing variable - name: Current variable name - new_name: New variable name - Returns: dict: Operation result """ - if not function or not name or not new_name: - return {"success": False, "error": "Function, name, and new_name parameters are required"} - - encoded_function = quote(function) - encoded_var = quote(name) - return safe_post(port, f"functions/{encoded_function}/variables/{encoded_var}", {"newName": new_name}) - - -@mcp.tool() -def retype_variable(port: int = DEFAULT_GHIDRA_PORT, function: str = "", name: str = "", data_type: str = "") -> dict: - """Change variable data type in function - - Args: - port: Ghidra instance port (default: 8192) - function: Function name containing variable - name: Variable name to modify - data_type: New data type - - Returns: - dict: Operation result - """ - if not function or not name or not data_type: - return {"success": False, "error": "Function, name, and data_type parameters are required"} - - encoded_function = quote(function) - encoded_var = quote(name) - return safe_post(port, f"functions/{encoded_function}/variables/{encoded_var}", {"dataType": data_type}) + if not address: + return { + "success": False, + "error": "Address parameter is required", + "timestamp": int(time.time() * 1000) + } + + payload = { + "comment": comment + } + + response = safe_post(port, f"memory/{address}/comments/{comment_type}", payload) + return simplify_response(response) def handle_sigint(signum, frame): @@ -1447,7 +1338,8 @@ def periodic_discovery(): for port, info in active_instances.items(): url = info["url"] try: - response = requests.get(f"{url}/instances", timeout=1) + # Check HATEOAS API via plugin-version endpoint + response = requests.get(f"{url}/plugin-version", timeout=1) if not response.ok: ports_to_remove.append(port) except requests.exceptions.RequestException: @@ -1476,4 +1368,4 @@ if __name__ == "__main__": discovery_thread.start() signal.signal(signal.SIGINT, handle_sigint) - mcp.run(transport="stdio") + mcp.run(transport="stdio") \ No newline at end of file