From f04223d23ac481724c2deb88c01b3ba3e21873ae Mon Sep 17 00:00:00 2001 From: Teal Bauer Date: Tue, 15 Apr 2025 12:16:42 +0200 Subject: [PATCH] refactor: clean up python comments and consolidate data test files --- MCP_BRIDGE_API.md | 150 ---- bridge_mcp_hydra.py | 9 +- error.tmp | 125 --- refactoring_namespaces.py | 1621 ------------------------------------- refactoring_proposal.md | 261 ------ refactoring_sample.py | 1286 ----------------------------- run_tests.py | 69 +- test_comments.py | 100 +-- test_data_create.py | 135 --- test_data_delete.py | 85 -- test_data_operations.py | 471 +++++++++-- test_data_simple.py | 54 -- test_data_type.py | 105 --- test_data_update.py | 181 ----- 14 files changed, 498 insertions(+), 4154 deletions(-) delete mode 100644 MCP_BRIDGE_API.md delete mode 100644 error.tmp delete mode 100644 refactoring_namespaces.py delete mode 100644 refactoring_proposal.md delete mode 100644 refactoring_sample.py delete mode 100644 test_data_create.py delete mode 100644 test_data_delete.py delete mode 100755 test_data_simple.py delete mode 100755 test_data_type.py delete mode 100755 test_data_update.py diff --git a/MCP_BRIDGE_API.md b/MCP_BRIDGE_API.md deleted file mode 100644 index a07711f..0000000 --- a/MCP_BRIDGE_API.md +++ /dev/null @@ -1,150 +0,0 @@ -# GhydraMCP Bridge API Documentation - -## Overview -This document describes the MCP tools and resources exposed by the GhydraMCP bridge that connects to Ghidra's HTTP API. The bridge provides a higher-level interface optimized for AI agent usage. - -## Core Concepts -- Each Ghidra instance runs its own HTTP server (default port 8192) -- The bridge discovers and manages multiple Ghidra instances -- Programs are addressed by their unique identifier within Ghidra (`project:/path/to/file`). -- The primary identifier for a program is its Ghidra path, e.g., `myproject:/path/to/mybinary.exe`. -- The bridge must keep track of which plugin host and port has which project & file and route accordingly -- Tools are organized by resource type (programs, functions, data, etc.) -- Consistent response format with success/error indicators - -## Instance Management Tools - -### `list_instances` -List all active Ghidra instances with their ports and project info. - -### `discover_instances` -Scan for available Ghidra instances by port range. - -### `register_instance` -Manually register a Ghidra instance by port/URL. - -## Program Analysis Tools - -### `list_functions` -List functions in current program with pagination. - -### `get_function` -Get details and decompilation for a function by name. - -### `get_function_by_address` -Get function details by memory address. - -### `decompile_function_by_address` -Decompile function at specific address. - -### `list_segments` -List memory segments/sections in program. - -### `list_data_items` -List defined data items in program. - -### `read_memory` -Read bytes from memory at address. Parameters: -- `address`: Hex address -- `length`: Bytes to read -- `format`: "hex", "base64" or "string" output format - -### `write_memory` -Write bytes to memory at address (use with caution). Parameters: -- `address`: Hex address -- `bytes`: Data to write -- `format`: "hex", "base64" or "string" input format - -### `list_variables` -List global variables with search/filter. - -## Modification Tools - -### `update_function` -Rename a function. - -### `update_data` -Rename data at memory address. - -### `set_function_prototype` -Change a function's signature. - -### `rename_local_variable` -Rename variable within function. - -### `set_local_variable_type` -Change variable's data type. - -## Response Format -All tools return responses in this format: -```json -{ - "id": "request-id", - "instance": "http://host:port", - "success": true/false, - "result": {...}, // Tool-specific data - "error": { // Only on failure - "code": "...", - "message": "..." - }, - "_links": { // HATEOAS links - "self": {"href": "/path"}, - "related": {"href": "/other"} - } -} -``` - -## Example Usage - -1. Discover available instances: -```python -discover_instances() -``` - -2. List functions in first instance: -```python -list_functions(port=8192, limit=10) -``` - -3. Decompile main function: -```python -get_function(port=8192, name="main") -``` - -4. Rename a function: -```python -update_function(port=8192, name="FUN_1234", new_name="parse_data") -``` - -## Error Handling -- Check `success` field first -- On failure, `error` contains details -- Common error codes: - - `INSTANCE_NOT_FOUND` - - `RESOURCE_NOT_FOUND` - - `INVALID_PARAMETER` - - `TRANSACTION_FAILED` - -## Advanced Analysis Tools - -### `list_xrefs` -List cross-references between code/data. Parameters: -- `to_addr`: Filter refs to this address -- `from_addr`: Filter refs from this address -- `type`: Filter by ref type ("CALL", "READ", etc) -- Basic pagination via `offset`/`limit` - -### `analyze_program` -Run Ghidra analysis with optional settings: -- `analysis_options`: Dict of analysis passes to enable - -### `get_callgraph` -Get function call graph visualization data: -- `function`: Starting function (defaults to entry point) -- `max_depth`: Maximum call depth (default: 3) - -### `get_dataflow` -Perform data flow analysis from address: -- `address`: Starting point in hex -- `direction`: "forward" or "backward" -- `max_steps`: Max analysis steps diff --git a/bridge_mcp_hydra.py b/bridge_mcp_hydra.py index 029c0e9..773784d 100644 --- a/bridge_mcp_hydra.py +++ b/bridge_mcp_hydra.py @@ -5,8 +5,8 @@ # "requests==2.32.3", # ] # /// -# GhydraMCP Bridge for Ghidra HATEOAS API - Refactored for MCP optimization -# This provides a revised implementation with namespaced tools +# GhydraMCP Bridge for Ghidra HATEOAS API - Optimized for MCP integration +# Provides namespaced tools for interacting with Ghidra's reverse engineering capabilities import os import signal import sys @@ -21,24 +21,19 @@ from mcp.server.fastmcp import FastMCP # ================= Core Infrastructure ================= -# Allowed origins for CORS/CSRF protection ALLOWED_ORIGINS = os.environ.get( "GHIDRA_ALLOWED_ORIGINS", "http://localhost").split(",") -# Track active Ghidra instances (port -> info dict) active_instances: Dict[int, dict] = {} instances_lock = Lock() DEFAULT_GHIDRA_PORT = 8192 DEFAULT_GHIDRA_HOST = "localhost" -# Port ranges for scanning QUICK_DISCOVERY_RANGE = range(DEFAULT_GHIDRA_PORT, DEFAULT_GHIDRA_PORT+10) FULL_DISCOVERY_RANGE = range(DEFAULT_GHIDRA_PORT, DEFAULT_GHIDRA_PORT+20) -# Version information BRIDGE_VERSION = "v2.0.0-beta.1" REQUIRED_API_VERSION = 2 -# Global state for the current instance current_instance_port = DEFAULT_GHIDRA_PORT instructions = """ diff --git a/error.tmp b/error.tmp deleted file mode 100644 index b3e9d1b..0000000 --- a/error.tmp +++ /dev/null @@ -1,125 +0,0 @@ -╭────────────────────────────────────────────────────────────────────────────────────── Traceback (most recent call last) ───────────────────────────────────────────────────────────────────────────────────────╮ -│ /Users/teal/.asdf/installs/python/3.11.1/lib/python3.11/site-packages/mcp/cli/cli.py:236 in dev │ -│ │ -│ 233 │ ╭────────────────────────────────── locals ──────────────────────────────────╮ │ -│ 234 │ try: │ file = PosixPath('/Users/teal/src/GhydraMCP/bridge_mcp_hydra.py') │ │ -│ 235 │ │ # Import server to get dependencies │ file_spec = 'bridge_mcp_hydra.py' │ │ -│ ❱ 236 │ │ server = _import_server(file, server_object) │ server_object = None │ │ -│ 237 │ │ if hasattr(server, "dependencies"): │ with_editable = None │ │ -│ 238 │ │ │ with_packages = list(set(with_packages + server.dependencies)) │ with_packages = [] │ │ -│ 239 ╰────────────────────────────────────────────────────────────────────────────╯ │ -│ │ -│ /Users/teal/.asdf/installs/python/3.11.1/lib/python3.11/site-packages/mcp/cli/cli.py:142 in _import_server │ -│ │ -│ 139 │ │ sys.exit(1) │ -│ 140 │ │ -│ 141 │ module = importlib.util.module_from_spec(spec) │ -│ ❱ 142 │ spec.loader.exec_module(module) │ -│ 143 │ │ -│ 144 │ # If no object specified, try common server names │ -│ 145 │ if not server_object: │ -│ │ -│ ╭─────────────────────────────────────────────────────────────────────────────────────── locals ───────────────────────────────────────────────────────────────────────────────────────╮ │ -│ │ file = PosixPath('/Users/teal/src/GhydraMCP/bridge_mcp_hydra.py') │ │ -│ │ file_dir = '/Users/teal/src/GhydraMCP' │ │ -│ │ module = │ │ -│ │ server_object = None │ │ -│ │ spec = ModuleSpec(name='server_module', loader=<_frozen_importlib_external.SourceFileLoader object at 0x102ed0750>, origin='/Users/teal/src/GhydraMCP/bridge_mcp_hydra.py') │ │ -│ ╰──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╯ │ -│ in exec_module:940 │ -│ ╭─────────────────────────────────────────────────── locals ───────────────────────────────────────────────────╮ │ -│ │ code = at 0x159961400, file "/Users/teal/src/GhydraMCP/bridge_mcp_hydra.py", line 1> │ │ -│ │ module = │ │ -│ │ self = <_frozen_importlib_external.SourceFileLoader object at 0x102ed0750> │ │ -│ ╰──────────────────────────────────────────────────────────────────────────────────────────────────────────────╯ │ -│ in _call_with_frames_removed:241 │ -│ ╭───────────────────────────────────────────────────────────────────────────────────────────── locals ─────────────────────────────────────────────────────────────────────────────────────────────╮ │ -│ │ args = ( │ │ -│ │ │ at 0x159961400, file "/Users/teal/src/GhydraMCP/bridge_mcp_hydra.py", line 1>, │ │ -│ │ │ { │ │ -│ │ │ │ '__name__': 'server_module', │ │ -│ │ │ │ '__doc__': None, │ │ -│ │ │ │ '__package__': '', │ │ -│ │ │ │ '__loader__': <_frozen_importlib_external.SourceFileLoader object at 0x102ed0750>, │ │ -│ │ │ │ '__spec__': ModuleSpec(name='server_module', loader=<_frozen_importlib_external.SourceFileLoader object at 0x102ed0750>, origin='/Users/teal/src/GhydraMCP/bridge_mcp_hydra.py'), │ │ -│ │ │ │ '__file__': '/Users/teal/src/GhydraMCP/bridge_mcp_hydra.py', │ │ -│ │ │ │ '__cached__': '/Users/teal/src/GhydraMCP/__pycache__/bridge_mcp_hydra.cpython-311.pyc', │ │ -│ │ │ │ '__builtins__': { │ │ -│ │ │ │ │ '__name__': 'builtins', │ │ -│ │ │ │ │ '__doc__': 'Built-in functions, exceptions, and other objects.\n\nNoteworthy: None is the `nil'+46, │ │ -│ │ │ │ │ '__package__': '', │ │ -│ │ │ │ │ '__loader__': , │ │ -│ │ │ │ │ '__spec__': ModuleSpec(name='builtins', loader=, origin='built-in'), │ │ -│ │ │ │ │ '__build_class__': , │ │ -│ │ │ │ │ '__import__': , │ │ -│ │ │ │ │ 'abs': , │ │ -│ │ │ │ │ 'all': , │ │ -│ │ │ │ │ 'any': , │ │ -│ │ │ │ │ ... +147 │ │ -│ │ │ │ }, │ │ -│ │ │ │ '__annotations__': {'active_instances': typing.Dict[int, dict]}, │ │ -│ │ │ │ 'os': , │ │ -│ │ │ │ ... +42 │ │ -│ │ │ } │ │ -│ │ ) │ │ -│ │ f = │ │ -│ │ kwds = {} │ │ -│ ╰──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╯ │ -│ │ -│ /Users/teal/src/GhydraMCP/bridge_mcp_hydra.py:583 in │ -│ │ -│ 580 # Resources provide information that can be loaded directly into context │ -│ 581 # They focus on data and minimize metadata │ -│ 582 │ -│ ❱ 583 @mcp.resource() │ -│ 584 def ghidra_instance(port: int = None) -> dict: │ -│ 585 │ """Get detailed information about a Ghidra instance and the loaded program │ -│ 586 │ -│ │ -│ ╭─────────────────────────────────────────────────────────────────── locals ────────────────────────────────────────────────────────────────────╮ │ -│ │ _discover_instances = │ │ -│ │ _get_instance_port = │ │ -│ │ _make_request = │ │ -│ │ active_instances = {} │ │ -│ │ ALLOWED_ORIGINS = ['http://localhost'] │ │ -│ │ Any = typing.Any │ │ -│ │ BRIDGE_VERSION = 'v2.0.0-beta.1' │ │ -│ │ current_instance_port = 8192 │ │ -│ │ DEFAULT_GHIDRA_HOST = 'localhost' │ │ -│ │ DEFAULT_GHIDRA_PORT = 8192 │ │ -│ │ Dict = typing.Dict │ │ -│ │ FastMCP = │ │ -│ │ FULL_DISCOVERY_RANGE = range(8192, 8212) │ │ -│ │ get_instance_url = │ │ -│ │ ghidra_host = 'localhost' │ │ -│ │ handle_sigint = │ │ -│ │ instances_lock = │ │ -│ │ instructions = '\nGhydraMCP allows interacting with multiple Ghidra SRE instances. Ghidra SRE is '+497 │ │ -│ │ List = typing.List │ │ -│ │ Lock = │ │ -│ │ mcp = │ │ -│ │ Optional = typing.Optional │ │ -│ │ os = │ │ -│ │ periodic_discovery = │ │ -│ │ QUICK_DISCOVERY_RANGE = range(8192, 8202) │ │ -│ │ quote = │ │ -│ │ register_instance = │ │ -│ │ requests = │ │ -│ │ REQUIRED_API_VERSION = 2 │ │ -│ │ safe_delete = │ │ -│ │ safe_get = │ │ -│ │ safe_patch = │ │ -│ │ safe_post = │ │ -│ │ safe_put = │ │ -│ │ signal = │ │ -│ │ simplify_response = │ │ -│ │ sys = │ │ -│ │ threading = │ │ -│ │ time = │ │ -│ │ Union = typing.Union │ │ -│ │ urlencode = │ │ -│ │ urlparse = │ │ -│ │ validate_origin = │ │ -│ ╰───────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╯ │ -╰────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╯ -TypeError: FastMCP.resource() missing 1 required positional argument: 'uri' diff --git a/refactoring_namespaces.py b/refactoring_namespaces.py deleted file mode 100644 index cf6486e..0000000 --- a/refactoring_namespaces.py +++ /dev/null @@ -1,1621 +0,0 @@ -# /// script -# requires-python = ">=3.11" -# dependencies = [ -# "mcp==1.6.0", -# "requests==2.32.3", -# ] -# /// -# GhydraMCP Bridge for Ghidra HATEOAS API - Refactored for MCP optimization -# This provides a revised implementation without tool_group - -import os -import signal -import sys -import threading -import time -from threading import Lock -from typing import Dict, List, Optional, Union, Any -from urllib.parse import quote, urlencode, urlparse - -import requests -from mcp.server.fastmcp import FastMCP - -# ================= Core Infrastructure ================= - -# Allowed origins for CORS/CSRF protection -ALLOWED_ORIGINS = os.environ.get( - "GHIDRA_ALLOWED_ORIGINS", "http://localhost").split(",") - -# Track active Ghidra instances (port -> info dict) -active_instances: Dict[int, dict] = {} -instances_lock = Lock() -DEFAULT_GHIDRA_PORT = 8192 -DEFAULT_GHIDRA_HOST = "localhost" -# Port ranges for scanning -QUICK_DISCOVERY_RANGE = range(DEFAULT_GHIDRA_PORT, DEFAULT_GHIDRA_PORT+10) -FULL_DISCOVERY_RANGE = range(DEFAULT_GHIDRA_PORT, DEFAULT_GHIDRA_PORT+20) - -# Version information -BRIDGE_VERSION = "v2.0.0-beta.1" -REQUIRED_API_VERSION = 2 - -# Global state for the current instance -current_instance_port = DEFAULT_GHIDRA_PORT - -instructions = """ -GhydraMCP allows interacting with multiple Ghidra SRE instances. Ghidra SRE is a tool for reverse engineering and analyzing binaries, e.g. malware. - -First, run `instances_discover()` to find open Ghidra instances. Then use `instances_use(port)` to set your working instance. - -The API is organized into namespaces for different types of operations: -- instances_* : For managing Ghidra instances -- functions_* : For working with functions -- data_* : For working with data items -- memory_* : For memory access -- xrefs_* : For cross-references -- analysis_* : For program analysis -""" - -mcp = FastMCP("GhydraMCP", version=BRIDGE_VERSION, instructions=instructions) - -ghidra_host = os.environ.get("GHIDRA_HYDRA_HOST", DEFAULT_GHIDRA_HOST) - -# Helper function to get the current instance or validate a specific port -def _get_instance_port(port=None): - """Internal helper to get the current instance port or validate a specific port""" - port = port or current_instance_port - # Validate that the instance exists and is active - if port not in active_instances: - # Try to register it if not found - register_instance(port) - if port not in active_instances: - raise ValueError(f"No active Ghidra instance on port {port}") - return port - -# The rest of the utility functions (HTTP helpers, etc.) remain the same... -def get_instance_url(port: int) -> str: - """Get URL for a Ghidra instance by port""" - with instances_lock: - if port in active_instances: - return active_instances[port]["url"] - - if 8192 <= port <= 65535: - register_instance(port) - if port in active_instances: - return active_instances[port]["url"] - - return f"http://{ghidra_host}:{port}" - -def validate_origin(headers: dict) -> bool: - """Validate request origin against allowed origins""" - origin = headers.get("Origin") - if not origin: - # No origin header - allow (browser same-origin policy applies) - return True - - # Parse origin to get scheme+hostname - try: - parsed = urlparse(origin) - origin_base = f"{parsed.scheme}://{parsed.hostname}" - if parsed.port: - origin_base += f":{parsed.port}" - except: - return False - - return origin_base in ALLOWED_ORIGINS - -def _make_request(method: str, port: int, endpoint: str, params: dict = None, - json_data: dict = None, data: str = None, - headers: dict = None) -> dict: - """Internal helper to make HTTP requests and handle common errors.""" - url = f"{get_instance_url(port)}/{endpoint}" - - # Set up headers according to HATEOAS API expected format - request_headers = { - 'Accept': 'application/json', - 'X-Request-ID': f"mcp-bridge-{int(time.time() * 1000)}" - } - - if headers: - request_headers.update(headers) - - is_state_changing = method.upper() in ["POST", "PUT", "PATCH", "DELETE"] - if is_state_changing: - check_headers = json_data.get("headers", {}) if isinstance( - json_data, dict) else (headers or {}) - if not validate_origin(check_headers): - return { - "success": False, - "error": { - "code": "ORIGIN_NOT_ALLOWED", - "message": "Origin not allowed for state-changing request" - }, - "status_code": 403, - "timestamp": int(time.time() * 1000) - } - if json_data is not None: - request_headers['Content-Type'] = 'application/json' - elif data is not None: - request_headers['Content-Type'] = 'text/plain' - - try: - response = requests.request( - method, - url, - params=params, - json=json_data, - data=data, - headers=request_headers, - timeout=10 - ) - - try: - parsed_json = response.json() - - # Add timestamp if not present - if isinstance(parsed_json, dict) and "timestamp" not in parsed_json: - parsed_json["timestamp"] = int(time.time() * 1000) - - # Check for HATEOAS compliant error response format and reformat if needed - if not response.ok and isinstance(parsed_json, dict) and "success" in parsed_json and not parsed_json["success"]: - # Check if error is in the expected HATEOAS format - if "error" in parsed_json and not isinstance(parsed_json["error"], dict): - # Convert string error to the proper format - error_message = parsed_json["error"] - parsed_json["error"] = { - "code": f"HTTP_{response.status_code}", - "message": error_message - } - - return parsed_json - - except ValueError: - if response.ok: - return { - "success": False, - "error": { - "code": "NON_JSON_RESPONSE", - "message": "Received non-JSON success response from Ghidra plugin" - }, - "status_code": response.status_code, - "response_text": response.text[:500], - "timestamp": int(time.time() * 1000) - } - else: - return { - "success": False, - "error": { - "code": f"HTTP_{response.status_code}", - "message": f"Non-JSON error response: {response.text[:100]}..." - }, - "status_code": response.status_code, - "response_text": response.text[:500], - "timestamp": int(time.time() * 1000) - } - - except requests.exceptions.Timeout: - return { - "success": False, - "error": { - "code": "REQUEST_TIMEOUT", - "message": "Request timed out" - }, - "status_code": 408, - "timestamp": int(time.time() * 1000) - } - except requests.exceptions.ConnectionError: - return { - "success": False, - "error": { - "code": "CONNECTION_ERROR", - "message": f"Failed to connect to Ghidra instance at {url}" - }, - "status_code": 503, - "timestamp": int(time.time() * 1000) - } - except Exception as e: - return { - "success": False, - "error": { - "code": "UNEXPECTED_ERROR", - "message": f"An unexpected error occurred: {str(e)}" - }, - "exception": e.__class__.__name__, - "timestamp": int(time.time() * 1000) - } - -def safe_get(port: int, endpoint: str, params: dict = None) -> dict: - """Make GET request to Ghidra instance""" - return _make_request("GET", port, endpoint, params=params) - -def safe_put(port: int, endpoint: str, data: dict) -> dict: - """Make PUT request to Ghidra instance with JSON payload""" - headers = data.pop("headers", None) if isinstance(data, dict) else None - return _make_request("PUT", port, endpoint, json_data=data, headers=headers) - -def safe_post(port: int, endpoint: str, data: Union[dict, str]) -> dict: - """Perform a POST request to a specific Ghidra instance with JSON or text payload""" - headers = None - json_payload = None - text_payload = None - - if isinstance(data, dict): - headers = data.pop("headers", None) - json_payload = data - else: - text_payload = data - - return _make_request("POST", port, endpoint, json_data=json_payload, data=text_payload, headers=headers) - -def safe_patch(port: int, endpoint: str, data: dict) -> dict: - """Perform a PATCH request to a specific Ghidra instance with JSON payload""" - headers = data.pop("headers", None) if isinstance(data, dict) else None - return _make_request("PATCH", port, endpoint, json_data=data, headers=headers) - -def safe_delete(port: int, endpoint: str) -> dict: - """Perform a DELETE request to a specific Ghidra instance""" - return _make_request("DELETE", port, endpoint) - -def simplify_response(response: dict) -> dict: - """ - Simplify HATEOAS response data for easier AI agent consumption - - Removes _links from result entries - - Flattens nested structures when appropriate - - Preserves important metadata - - Converts structured data like disassembly to text for easier consumption - """ - if not isinstance(response, dict): - return response - - # Make a copy to avoid modifying the original - result = response.copy() - - # Store API response metadata - api_metadata = {} - for key in ["id", "instance", "timestamp", "size", "offset", "limit"]: - if key in result: - api_metadata[key] = result.get(key) - - # Simplify the main result data if present - if "result" in result: - # Handle array results - if isinstance(result["result"], list): - simplified_items = [] - for item in result["result"]: - if isinstance(item, dict): - # Store but remove HATEOAS links from individual items - item_copy = item.copy() - links = item_copy.pop("_links", None) - - # Optionally store direct href links as more accessible properties - # This helps AI agents navigate the API without understanding HATEOAS - if isinstance(links, dict): - for link_name, link_data in links.items(): - if isinstance(link_data, dict) and "href" in link_data: - item_copy[f"{link_name}_url"] = link_data["href"] - - simplified_items.append(item_copy) - else: - simplified_items.append(item) - result["result"] = simplified_items - - # Handle object results - elif isinstance(result["result"], dict): - result_copy = result["result"].copy() - - # Store but remove links from result object - links = result_copy.pop("_links", None) - - # Add direct href links for easier navigation - if isinstance(links, dict): - for link_name, link_data in links.items(): - if isinstance(link_data, dict) and "href" in link_data: - result_copy[f"{link_name}_url"] = link_data["href"] - - # Special case for disassembly - convert to text for easier consumption - if "instructions" in result_copy and isinstance(result_copy["instructions"], list): - disasm_text = "" - for instr in result_copy["instructions"]: - if isinstance(instr, dict): - addr = instr.get("address", "") - mnemonic = instr.get("mnemonic", "") - operands = instr.get("operands", "") - bytes_str = instr.get("bytes", "") - - # Format: address: bytes mnemonic operands - disasm_text += f"{addr}: {bytes_str.ljust(10)} {mnemonic} {operands}\n" - - # Add the text representation while preserving the original structured data - result_copy["disassembly_text"] = disasm_text - - # Special case for decompiled code - make sure it's directly accessible - if "ccode" in result_copy: - result_copy["decompiled_text"] = result_copy["ccode"] - elif "decompiled" in result_copy: - result_copy["decompiled_text"] = result_copy["decompiled"] - - result["result"] = result_copy - - # Store but remove HATEOAS links from the top level - links = result.pop("_links", None) - - # Add direct href links in a more accessible format - if isinstance(links, dict): - api_links = {} - for link_name, link_data in links.items(): - if isinstance(link_data, dict) and "href" in link_data: - api_links[link_name] = link_data["href"] - - # Add simplified links - if api_links: - result["api_links"] = api_links - - # Restore API metadata - for key, value in api_metadata.items(): - if key not in result: - result[key] = value - - return result - -def register_instance(port: int, url: str = None) -> str: - """Register a new Ghidra instance - - Args: - port: Port number of the Ghidra instance - url: Optional URL if different from default http://host:port - - Returns: - str: Confirmation message or error - """ - if url is None: - url = f"http://{ghidra_host}:{port}" - - try: - # Check for HATEOAS API by checking plugin-version endpoint - test_url = f"{url}/plugin-version" - response = requests.get(test_url, timeout=2) - - if not response.ok: - return f"Error: Instance at {url} is not responding properly to HATEOAS API" - - project_info = {"url": url} - - try: - # Check plugin version to ensure compatibility - try: - version_data = response.json() - if "result" in version_data: - result = version_data["result"] - if isinstance(result, dict): - plugin_version = result.get("plugin_version", "") - api_version = result.get("api_version", 0) - - project_info["plugin_version"] = plugin_version - project_info["api_version"] = api_version - - # Verify API version compatibility - if api_version != REQUIRED_API_VERSION: - error_msg = f"API version mismatch: Plugin reports version {api_version}, but bridge requires version {REQUIRED_API_VERSION}" - print(error_msg, file=sys.stderr) - return error_msg - - print(f"Connected to Ghidra plugin version {plugin_version} with API version {api_version}") - except Exception as e: - print(f"Error parsing plugin version: {e}", file=sys.stderr) - - # Get program info from HATEOAS API - info_url = f"{url}/program" - - try: - info_response = requests.get(info_url, timeout=2) - if info_response.ok: - try: - info_data = info_response.json() - if "result" in info_data: - result = info_data["result"] - if isinstance(result, dict): - # Extract project and file from programId (format: "project:/file") - program_id = result.get("programId", "") - if ":" in program_id: - project_name, file_path = program_id.split(":", 1) - project_info["project"] = project_name - # Remove leading slash from file path if present - if file_path.startswith("/"): - file_path = file_path[1:] - project_info["path"] = file_path - - # Get file name directly from the result - project_info["file"] = result.get("name", "") - - # Get other metadata - project_info["language_id"] = result.get("languageId", "") - project_info["compiler_spec_id"] = result.get("compilerSpecId", "") - project_info["image_base"] = result.get("image_base", "") - - # Store _links from result for HATEOAS navigation - if "_links" in result: - project_info["_links"] = result.get("_links", {}) - except Exception as e: - print(f"Error parsing info endpoint: {e}", file=sys.stderr) - except Exception as e: - print(f"Error connecting to info endpoint: {e}", file=sys.stderr) - except Exception: - # Non-critical, continue with registration even if project info fails - pass - - with instances_lock: - active_instances[port] = project_info - - return f"Registered instance on port {port} at {url}" - except Exception as e: - return f"Error: Could not connect to instance at {url}: {str(e)}" - -def _discover_instances(port_range, host=None, timeout=0.5) -> dict: - """Internal function to discover Ghidra instances by scanning ports""" - found_instances = [] - scan_host = host if host is not None else ghidra_host - - for port in port_range: - if port in active_instances: - continue - - url = f"http://{scan_host}:{port}" - try: - # Try HATEOAS API via plugin-version endpoint - test_url = f"{url}/plugin-version" - response = requests.get(test_url, - headers={'Accept': 'application/json', - 'X-Request-ID': f"discovery-{int(time.time() * 1000)}"}, - timeout=timeout) - - if response.ok: - # Further validate it's a GhydraMCP instance by checking response format - try: - json_data = response.json() - if "success" in json_data and json_data["success"] and "result" in json_data: - # Looks like a valid HATEOAS API response - # Instead of relying only on register_instance, which already checks program info, - # extract additional information here for more detailed discovery results - result = register_instance(port, url) - - # Initialize report info - instance_info = { - "port": port, - "url": url - } - - # Extract version info for reporting - if isinstance(json_data["result"], dict): - instance_info["plugin_version"] = json_data["result"].get("plugin_version", "unknown") - instance_info["api_version"] = json_data["result"].get("api_version", "unknown") - else: - instance_info["plugin_version"] = "unknown" - instance_info["api_version"] = "unknown" - - # Include project details from registered instance in the report - if port in active_instances: - instance_info["project"] = active_instances[port].get("project", "") - instance_info["file"] = active_instances[port].get("file", "") - - instance_info["result"] = result - found_instances.append(instance_info) - except (ValueError, KeyError): - # Not a valid JSON response or missing expected keys - print(f"Port {port} returned non-HATEOAS response", file=sys.stderr) - continue - - except requests.exceptions.RequestException: - # Instance not available, just continue - continue - - return { - "found": len(found_instances), - "instances": found_instances - } - -def periodic_discovery(): - """Periodically discover new instances""" - while True: - try: - _discover_instances(FULL_DISCOVERY_RANGE, timeout=0.5) - - with instances_lock: - ports_to_remove = [] - for port, info in active_instances.items(): - url = info["url"] - try: - # Check HATEOAS API via plugin-version endpoint - response = requests.get(f"{url}/plugin-version", timeout=1) - if not response.ok: - ports_to_remove.append(port) - continue - - # Update program info if available (especially to get project name) - try: - info_url = f"{url}/program" - info_response = requests.get(info_url, timeout=1) - if info_response.ok: - try: - info_data = info_response.json() - if "result" in info_data: - result = info_data["result"] - if isinstance(result, dict): - # Extract project and file from programId (format: "project:/file") - program_id = result.get("programId", "") - if ":" in program_id: - project_name, file_path = program_id.split(":", 1) - info["project"] = project_name - # Remove leading slash from file path if present - if file_path.startswith("/"): - file_path = file_path[1:] - info["path"] = file_path - - # Get file name directly from the result - info["file"] = result.get("name", "") - - # Get other metadata - info["language_id"] = result.get("languageId", "") - info["compiler_spec_id"] = result.get("compilerSpecId", "") - info["image_base"] = result.get("image_base", "") - except Exception as e: - print(f"Error parsing info endpoint during discovery: {e}", file=sys.stderr) - except Exception: - # Non-critical, continue even if update fails - pass - - except requests.exceptions.RequestException: - ports_to_remove.append(port) - - for port in ports_to_remove: - del active_instances[port] - print(f"Removed unreachable instance on port {port}") - except Exception as e: - print(f"Error in periodic discovery: {e}") - - time.sleep(30) - -def handle_sigint(signum, frame): - os._exit(0) - -# ================= MCP Resources ================= -# Resources provide information that can be loaded directly into context -# They focus on data and minimize metadata - -@mcp.resource() -def ghidra_instance(port: int = None) -> dict: - """Get detailed information about a Ghidra instance and the loaded program - - Args: - port: Specific Ghidra instance port (optional, uses current if omitted) - - Returns: - dict: Detailed information about the Ghidra instance and loaded program - """ - port = _get_instance_port(port) - response = safe_get(port, "program") - - if not isinstance(response, dict) or not response.get("success", False): - return {"error": f"Unable to access Ghidra instance on port {port}"} - - # Extract only the most relevant information for the resource - result = response.get("result", {}) - - if not isinstance(result, dict): - return {"error": "Invalid response format from Ghidra instance"} - - instance_info = { - "port": port, - "url": get_instance_url(port), - "program_name": result.get("name", "unknown"), - "program_id": result.get("programId", "unknown"), - "language": result.get("languageId", "unknown"), - "compiler": result.get("compilerSpecId", "unknown"), - "base_address": result.get("imageBase", "0x0"), - "memory_size": result.get("memorySize", 0), - "analysis_complete": result.get("analysisComplete", False) - } - - # Add project information if available - if "project" in active_instances[port]: - instance_info["project"] = active_instances[port]["project"] - - return instance_info - -@mcp.resource() -def decompiled_function(name: str = None, address: str = None, port: int = None) -> str: - """Get decompiled C code for a function - - Args: - name: Function name (mutually exclusive with address) - address: Function address in hex format (mutually exclusive with address) - port: Specific Ghidra instance port (optional) - - Returns: - str: The decompiled C code as a string, or error message - """ - if not name and not address: - return "Error: Either name or address parameter is required" - - port = _get_instance_port(port) - - params = { - "syntax_tree": "false", - "style": "normalize" - } - - if address: - endpoint = f"functions/{address}/decompile" - else: - endpoint = f"functions/by-name/{quote(name)}/decompile" - - response = safe_get(port, endpoint, params) - simplified = simplify_response(response) - - # For a resource, we want to directly return just the decompiled code - if (not isinstance(simplified, dict) or - not simplified.get("success", False) or - "result" not in simplified): - error_message = "Error: Could not decompile function" - if isinstance(simplified, dict) and "error" in simplified: - if isinstance(simplified["error"], dict): - error_message = simplified["error"].get("message", error_message) - else: - error_message = str(simplified["error"]) - return error_message - - # Extract just the decompiled code text - result = simplified["result"] - - # Different endpoints may return the code in different fields, try all of them - if isinstance(result, dict): - for key in ["decompiled_text", "ccode", "decompiled"]: - if key in result: - return result[key] - - return "Error: Could not extract decompiled code from response" - -@mcp.resource() -def function_info(name: str = None, address: str = None, port: int = None) -> dict: - """Get detailed information about a function - - Args: - name: Function name (mutually exclusive with address) - address: Function address in hex format (mutually exclusive with address) - port: Specific Ghidra instance port (optional) - - Returns: - dict: Complete function information including signature, parameters, etc. - """ - if not name and not address: - return {"error": "Either name or address parameter is required"} - - port = _get_instance_port(port) - - if address: - endpoint = f"functions/{address}" - else: - endpoint = f"functions/by-name/{quote(name)}" - - response = safe_get(port, endpoint) - simplified = simplify_response(response) - - if (not isinstance(simplified, dict) or - not simplified.get("success", False) or - "result" not in simplified): - error = {"error": "Could not get function information"} - if isinstance(simplified, dict) and "error" in simplified: - error["error_details"] = simplified["error"] - return error - - # Return just the function data without API metadata - return simplified["result"] - -@mcp.resource() -def disassembly(name: str = None, address: str = None, port: int = None) -> str: - """Get disassembled instructions for a function - - Args: - name: Function name (mutually exclusive with address) - address: Function address in hex format (mutually exclusive with address) - port: Specific Ghidra instance port (optional) - - Returns: - str: Formatted disassembly listing as a string - """ - if not name and not address: - return "Error: Either name or address parameter is required" - - port = _get_instance_port(port) - - if address: - endpoint = f"functions/{address}/disassembly" - else: - endpoint = f"functions/by-name/{quote(name)}/disassembly" - - response = safe_get(port, endpoint) - simplified = simplify_response(response) - - if (not isinstance(simplified, dict) or - not simplified.get("success", False) or - "result" not in simplified): - error_message = "Error: Could not get disassembly" - if isinstance(simplified, dict) and "error" in simplified: - if isinstance(simplified["error"], dict): - error_message = simplified["error"].get("message", error_message) - else: - error_message = str(simplified["error"]) - return error_message - - # For a resource, we want to directly return just the disassembly text - result = simplified["result"] - - # Check if we have a disassembly_text field already - if isinstance(result, dict) and "disassembly_text" in result: - return result["disassembly_text"] - - # Otherwise if we have raw instructions, format them ourselves - if isinstance(result, dict) and "instructions" in result and isinstance(result["instructions"], list): - disasm_text = "" - for instr in result["instructions"]: - if isinstance(instr, dict): - addr = instr.get("address", "") - mnemonic = instr.get("mnemonic", "") - operands = instr.get("operands", "") - bytes_str = instr.get("bytes", "") - - # Format: address: bytes mnemonic operands - disasm_text += f"{addr}: {bytes_str.ljust(10)} {mnemonic} {operands}\n" - - return disasm_text - - # If we have a direct disassembly field, try that as well - if isinstance(result, dict) and "disassembly" in result: - return result["disassembly"] - - return "Error: Could not extract disassembly from response" - -# ================= MCP Prompts ================= -# Prompts define reusable templates for LLM interactions - -@mcp.prompt("analyze_function") -def analyze_function_prompt(name: str = None, address: str = None, port: int = None): - """A prompt to guide the LLM through analyzing a function - - Args: - name: Function name (mutually exclusive with address) - address: Function address in hex format (mutually exclusive with address) - port: Specific Ghidra instance port (optional) - """ - port = _get_instance_port(port) - - # Get function name if only address is provided - if address and not name: - fn_info = function_info(address=address, port=port) - if isinstance(fn_info, dict) and "name" in fn_info: - name = fn_info["name"] - - # Create the template that guides analysis - return { - "prompt": f""" - Analyze the following function: {name or address} - - Decompiled code: - ```c - {decompiled_function(name=name, address=address, port=port)} - ``` - - Disassembly: - ``` - {disassembly(name=name, address=address, port=port)} - ``` - - 1. What is the purpose of this function? - 2. What are the key parameters and their uses? - 3. What are the return values and their meanings? - 4. Are there any security concerns in this implementation? - 5. Describe the algorithm or process being implemented. - """, - "context": { - "function_info": function_info(name=name, address=address, port=port) - } - } - -@mcp.prompt("identify_vulnerabilities") -def identify_vulnerabilities_prompt(name: str = None, address: str = None, port: int = None): - """A prompt to help identify potential vulnerabilities in a function - - Args: - name: Function name (mutually exclusive with address) - address: Function address in hex format (mutually exclusive with address) - port: Specific Ghidra instance port (optional) - """ - port = _get_instance_port(port) - - # Get function name if only address is provided - if address and not name: - fn_info = function_info(address=address, port=port) - if isinstance(fn_info, dict) and "name" in fn_info: - name = fn_info["name"] - - # Create the template focused on security analysis - return { - "prompt": f""" - Analyze the following function for security vulnerabilities: {name or address} - - Decompiled code: - ```c - {decompiled_function(name=name, address=address, port=port)} - ``` - - Look for these vulnerability types: - 1. Buffer overflows or underflows - 2. Integer overflow/underflow - 3. Use-after-free or double-free bugs - 4. Format string vulnerabilities - 5. Missing bounds checks - 6. Insecure memory operations - 7. Race conditions or timing issues - 8. Input validation problems - - For each potential vulnerability: - - Describe the vulnerability and where it occurs - - Explain the security impact - - Suggest how it could be exploited - - Recommend a fix - """, - "context": { - "function_info": function_info(name=name, address=address, port=port), - "disassembly": disassembly(name=name, address=address, port=port) - } - } - -# ================= MCP Tools ================= -# Since we can't use tool groups, we'll use namespaces in the function names - -# Instance management tools -@mcp.tool() -def instances_list() -> dict: - """List all active Ghidra instances""" - with instances_lock: - return { - "instances": [ - { - "port": port, - "url": info["url"], - "project": info.get("project", ""), - "file": info.get("file", "") - } - for port, info in active_instances.items() - ] - } - -@mcp.tool() -def instances_discover(host: str = None) -> dict: - """Discover available Ghidra instances by scanning ports - - Args: - host: Optional host to scan (default: configured ghidra_host) - - Returns: - dict: Contains 'found' count and 'instances' list with discovery results - """ - return _discover_instances(QUICK_DISCOVERY_RANGE, host=host, timeout=0.5) - -@mcp.tool() -def instances_register(port: int, url: str = None) -> str: - """Register a new Ghidra instance - - Args: - port: Port number of the Ghidra instance - url: Optional URL if different from default http://host:port - - Returns: - str: Confirmation message or error - """ - return register_instance(port, url) - -@mcp.tool() -def instances_unregister(port: int) -> str: - """Unregister a Ghidra instance - - Args: - port: Port number of the instance to unregister - - Returns: - str: Confirmation message or error - """ - with instances_lock: - if port in active_instances: - del active_instances[port] - return f"Unregistered instance on port {port}" - return f"No instance found on port {port}" - -@mcp.tool() -def instances_use(port: int) -> str: - """Set the current working Ghidra instance - - Args: - port: Port number of the instance to use - - Returns: - str: Confirmation message or error - """ - global current_instance_port - - # First validate that the instance exists and is active - if port not in active_instances: - # Try to register it if not found - register_instance(port) - if port not in active_instances: - return f"Error: No active Ghidra instance found on port {port}" - - # Set as current instance - current_instance_port = port - - # Return information about the selected instance - with instances_lock: - info = active_instances[port] - program = info.get("file", "unknown program") - project = info.get("project", "unknown project") - return f"Now using Ghidra instance on port {port} with {program} in project {project}" - -@mcp.tool() -def instances_current() -> dict: - """Get information about the current working Ghidra instance - - Returns: - dict: Details about the current instance and program - """ - return ghidra_instance(port=current_instance_port) - -# Function tools -@mcp.tool() -def functions_list(offset: int = 0, limit: int = 100, - name_contains: str = None, - name_matches_regex: str = None, - port: int = None) -> dict: - """List functions with filtering and pagination - - Args: - offset: Pagination offset (default: 0) - limit: Maximum items to return (default: 100) - name_contains: Substring name filter (case-insensitive) - name_matches_regex: Regex name filter - port: Specific Ghidra instance port (optional) - - Returns: - dict: List of functions with pagination information - """ - port = _get_instance_port(port) - - params = { - "offset": offset, - "limit": limit - } - if name_contains: - params["name_contains"] = name_contains - if name_matches_regex: - params["name_matches_regex"] = name_matches_regex - - response = safe_get(port, "functions", params) - simplified = simplify_response(response) - - # Ensure we maintain pagination metadata - if isinstance(simplified, dict) and "error" not in simplified: - simplified.setdefault("size", len(simplified.get("result", []))) - simplified.setdefault("offset", offset) - simplified.setdefault("limit", limit) - - return simplified - -@mcp.tool() -def functions_get(name: str = None, address: str = None, port: int = None) -> dict: - """Get detailed information about a function - - Args: - name: Function name (mutually exclusive with address) - address: Function address in hex format (mutually exclusive with name) - port: Specific Ghidra instance port (optional) - - Returns: - dict: Detailed function information - """ - if not name and not address: - return { - "success": False, - "error": { - "code": "MISSING_PARAMETER", - "message": "Either name or address parameter is required" - }, - "timestamp": int(time.time() * 1000) - } - - port = _get_instance_port(port) - - if address: - endpoint = f"functions/{address}" - else: - endpoint = f"functions/by-name/{quote(name)}" - - response = safe_get(port, endpoint) - return simplify_response(response) - -@mcp.tool() -def functions_decompile(name: str = None, address: str = None, - syntax_tree: bool = False, style: str = "normalize", - port: int = None) -> dict: - """Get decompiled code for a function - - Args: - name: Function name (mutually exclusive with address) - address: Function address in hex format (mutually exclusive with name) - syntax_tree: Include syntax tree (default: False) - style: Decompiler style (default: "normalize") - port: Specific Ghidra instance port (optional) - - Returns: - dict: Contains function information and decompiled code - """ - if not name and not address: - return { - "success": False, - "error": { - "code": "MISSING_PARAMETER", - "message": "Either name or address parameter is required" - }, - "timestamp": int(time.time() * 1000) - } - - port = _get_instance_port(port) - - params = { - "syntax_tree": str(syntax_tree).lower(), - "style": style - } - - if address: - endpoint = f"functions/{address}/decompile" - else: - endpoint = f"functions/by-name/{quote(name)}/decompile" - - response = safe_get(port, endpoint, params) - simplified = simplify_response(response) - - # For AI consumption, make the decompiled code more directly accessible - if "result" in simplified and isinstance(simplified["result"], dict): - if "decompiled" in simplified["result"]: - simplified["decompiled_code"] = simplified["result"]["decompiled"] - elif "ccode" in simplified["result"]: - simplified["decompiled_code"] = simplified["result"]["ccode"] - elif "decompiled_text" in simplified["result"]: - simplified["decompiled_code"] = simplified["result"]["decompiled_text"] - - return simplified - -@mcp.tool() -def functions_disassemble(name: str = None, address: str = None, port: int = None) -> dict: - """Get disassembly for a function - - Args: - name: Function name (mutually exclusive with address) - address: Function address in hex format (mutually exclusive with name) - port: Specific Ghidra instance port (optional) - - Returns: - dict: Contains function information and disassembly text - """ - if not name and not address: - return { - "success": False, - "error": { - "code": "MISSING_PARAMETER", - "message": "Either name or address parameter is required" - }, - "timestamp": int(time.time() * 1000) - } - - port = _get_instance_port(port) - - if address: - endpoint = f"functions/{address}/disassembly" - else: - endpoint = f"functions/by-name/{quote(name)}/disassembly" - - response = safe_get(port, endpoint) - return simplify_response(response) - -@mcp.tool() -def functions_create(address: str, port: int = None) -> dict: - """Create a new function at the specified address - - Args: - address: Memory address in hex format where function starts - port: Specific Ghidra instance port (optional) - - Returns: - dict: Operation result with the created function information - """ - if not address: - return { - "success": False, - "error": { - "code": "MISSING_PARAMETER", - "message": "Address parameter is required" - }, - "timestamp": int(time.time() * 1000) - } - - port = _get_instance_port(port) - - payload = { - "address": address - } - - response = safe_post(port, "functions", payload) - return simplify_response(response) - -@mcp.tool() -def functions_rename(old_name: str = None, address: str = None, new_name: str = "", port: int = None) -> dict: - """Rename a function - - Args: - old_name: Current function name (mutually exclusive with address) - address: Function address in hex format (mutually exclusive with name) - new_name: New function name - port: Specific Ghidra instance port (optional) - - Returns: - dict: Operation result with the updated function information - """ - if not (old_name or address) or not new_name: - return { - "success": False, - "error": { - "code": "MISSING_PARAMETER", - "message": "Either old_name or address, and new_name parameters are required" - }, - "timestamp": int(time.time() * 1000) - } - - port = _get_instance_port(port) - - payload = { - "name": new_name - } - - if address: - endpoint = f"functions/{address}" - else: - endpoint = f"functions/by-name/{quote(old_name)}" - - response = safe_patch(port, endpoint, payload) - return simplify_response(response) - -@mcp.tool() -def functions_set_signature(name: str = None, address: str = None, signature: str = "", port: int = None) -> dict: - """Set function signature/prototype - - Args: - name: Function name (mutually exclusive with address) - address: Function address in hex format (mutually exclusive with name) - signature: New function signature (e.g., "int func(char *data, int size)") - port: Specific Ghidra instance port (optional) - - Returns: - dict: Operation result with the updated function information - """ - if not (name or address) or not signature: - return { - "success": False, - "error": { - "code": "MISSING_PARAMETER", - "message": "Either name or address, and signature parameters are required" - }, - "timestamp": int(time.time() * 1000) - } - - port = _get_instance_port(port) - - payload = { - "signature": signature - } - - if address: - endpoint = f"functions/{address}" - else: - endpoint = f"functions/by-name/{quote(name)}" - - response = safe_patch(port, endpoint, payload) - return simplify_response(response) - -@mcp.tool() -def functions_get_variables(name: str = None, address: str = None, port: int = None) -> dict: - """Get variables for a function - - Args: - name: Function name (mutually exclusive with address) - address: Function address in hex format (mutually exclusive with name) - port: Specific Ghidra instance port (optional) - - Returns: - dict: Contains function information and list of variables - """ - if not name and not address: - return { - "success": False, - "error": { - "code": "MISSING_PARAMETER", - "message": "Either name or address parameter is required" - }, - "timestamp": int(time.time() * 1000) - } - - port = _get_instance_port(port) - - if address: - endpoint = f"functions/{address}/variables" - else: - endpoint = f"functions/by-name/{quote(name)}/variables" - - response = safe_get(port, endpoint) - return simplify_response(response) - -# Memory tools -@mcp.tool() -def memory_read(address: str, length: int = 16, format: str = "hex", port: int = None) -> dict: - """Read bytes from memory - - Args: - address: Memory address in hex format - length: Number of bytes to read (default: 16) - format: Output format - "hex", "base64", or "string" (default: "hex") - port: Specific Ghidra instance port (optional) - - Returns: - dict: { - "address": original address, - "length": bytes read, - "format": output format, - "hexBytes": the memory contents as hex string, - "rawBytes": the memory contents as base64 string, - "timestamp": response timestamp - } - """ - if not address: - return { - "success": False, - "error": { - "code": "MISSING_PARAMETER", - "message": "Address parameter is required" - }, - "timestamp": int(time.time() * 1000) - } - - port = _get_instance_port(port) - - # Use query parameters instead of path parameters for more reliable handling - params = { - "address": address, - "length": length, - "format": format - } - - response = safe_get(port, "memory", params) - simplified = simplify_response(response) - - # Ensure the result is simple and directly usable - if "result" in simplified and isinstance(simplified["result"], dict): - result = simplified["result"] - - # Pass through all representations of the bytes - memory_info = { - "success": True, - "address": result.get("address", address), - "length": result.get("bytesRead", length), - "format": format, - "timestamp": simplified.get("timestamp", int(time.time() * 1000)) - } - - # Include all the different byte representations - if "hexBytes" in result: - memory_info["hexBytes"] = result["hexBytes"] - if "rawBytes" in result: - memory_info["rawBytes"] = result["rawBytes"] - - return memory_info - - return simplified - -@mcp.tool() -def memory_write(address: str, bytes_data: str, format: str = "hex", port: int = None) -> dict: - """Write bytes to memory (use with caution) - - Args: - address: Memory address in hex format - bytes_data: Data to write (format depends on 'format' parameter) - format: Input format - "hex", "base64", or "string" (default: "hex") - port: Specific Ghidra instance port (optional) - - Returns: - dict: Operation result with success status - """ - if not address: - return { - "success": False, - "error": { - "code": "MISSING_PARAMETER", - "message": "Address parameter is required" - }, - "timestamp": int(time.time() * 1000) - } - - if not bytes_data: - return { - "success": False, - "error": { - "code": "MISSING_PARAMETER", - "message": "Bytes parameter is required" - }, - "timestamp": int(time.time() * 1000) - } - - port = _get_instance_port(port) - - payload = { - "bytes": bytes_data, - "format": format - } - - response = safe_patch(port, f"memory/{address}", payload) - return simplify_response(response) - -# Xrefs tools -@mcp.tool() -def xrefs_list(to_addr: str = None, from_addr: str = None, type: str = None, - offset: int = 0, limit: int = 100, port: int = None) -> dict: - """List cross-references with filtering and pagination - - Args: - to_addr: Filter references to this address (hexadecimal) - from_addr: Filter references from this address (hexadecimal) - type: Filter by reference type (e.g. "CALL", "READ", "WRITE") - offset: Pagination offset (default: 0) - limit: Maximum items to return (default: 100) - port: Specific Ghidra instance port (optional) - - Returns: - dict: Cross-references matching the filters - """ - # At least one of the address parameters must be provided - if not to_addr and not from_addr: - return { - "success": False, - "error": { - "code": "MISSING_PARAMETER", - "message": "Either to_addr or from_addr parameter is required" - }, - "timestamp": int(time.time() * 1000) - } - - port = _get_instance_port(port) - - params = { - "offset": offset, - "limit": limit - } - if to_addr: - params["to_addr"] = to_addr - if from_addr: - params["from_addr"] = from_addr - if type: - params["type"] = type - - response = safe_get(port, "xrefs", params) - simplified = simplify_response(response) - - # Ensure we maintain pagination metadata - if isinstance(simplified, dict) and "error" not in simplified: - simplified.setdefault("size", len(simplified.get("result", []))) - simplified.setdefault("offset", offset) - simplified.setdefault("limit", limit) - - return simplified - -# Data tools -@mcp.tool() -def data_list(offset: int = 0, limit: int = 100, addr: str = None, - name: str = None, name_contains: str = None, type: str = None, - port: int = None) -> dict: - """List defined data items with filtering and pagination - - Args: - offset: Pagination offset (default: 0) - limit: Maximum items to return (default: 100) - addr: Filter by address (hexadecimal) - name: Exact name match filter (case-sensitive) - name_contains: Substring name filter (case-insensitive) - type: Filter by data type (e.g. "string", "dword") - port: Specific Ghidra instance port (optional) - - Returns: - dict: Data items matching the filters - """ - port = _get_instance_port(port) - - params = { - "offset": offset, - "limit": limit - } - if addr: - params["addr"] = addr - if name: - params["name"] = name - if name_contains: - params["name_contains"] = name_contains - if type: - params["type"] = type - - response = safe_get(port, "data", params) - simplified = simplify_response(response) - - # Ensure we maintain pagination metadata - if isinstance(simplified, dict) and "error" not in simplified: - simplified.setdefault("size", len(simplified.get("result", []))) - simplified.setdefault("offset", offset) - simplified.setdefault("limit", limit) - - return simplified - -@mcp.tool() -def data_create(address: str, data_type: str, size: int = None, port: int = None) -> dict: - """Define a new data item at the specified address - - Args: - address: Memory address in hex format - data_type: Data type (e.g. "string", "dword", "byte") - size: Optional size in bytes for the data item - port: Specific Ghidra instance port (optional) - - Returns: - dict: Operation result with the created data information - """ - if not address or not data_type: - return { - "success": False, - "error": "Address and data_type parameters are required", - "timestamp": int(time.time() * 1000) - } - - port = _get_instance_port(port) - - payload = { - "address": address, - "type": data_type - } - - if size is not None: - payload["size"] = size - - response = safe_post(port, "data", payload) - return simplify_response(response) - -@mcp.tool() -def data_rename(address: str, name: str, port: int = None) -> dict: - """Rename a data item - - Args: - address: Memory address in hex format - name: New name for the data item - port: Specific Ghidra instance port (optional) - - Returns: - dict: Operation result with the updated data information - """ - if not address or not name: - return { - "success": False, - "error": { - "code": "MISSING_PARAMETER", - "message": "Address and name parameters are required" - }, - "timestamp": int(time.time() * 1000) - } - - port = _get_instance_port(port) - - payload = { - "address": address, - "newName": name - } - - response = safe_post(port, "data", payload) - return simplify_response(response) - -# Analysis tools -@mcp.tool() -def analysis_run(port: int = None, analysis_options: dict = None) -> dict: - """Run analysis on the current program - - Args: - analysis_options: Dictionary of analysis options to enable/disable - (e.g. {"functionRecovery": True, "dataRefs": False}) - port: Specific Ghidra instance port (optional) - - Returns: - dict: Analysis operation result with status - """ - port = _get_instance_port(port) - response = safe_post(port, "analysis", analysis_options or {}) - return simplify_response(response) - -@mcp.tool() -def analysis_get_callgraph(function: str = None, max_depth: int = 3, port: int = None) -> dict: - """Get function call graph visualization data - - Args: - function: Starting function name or address (None starts from entry point) - max_depth: Maximum call depth to analyze (default: 3) - port: Specific Ghidra instance port (optional) - - Returns: - dict: Graph data with nodes and edges - """ - port = _get_instance_port(port) - - params = {"max_depth": max_depth} - if function: - params["function"] = function - - response = safe_get(port, "analysis/callgraph", params) - return simplify_response(response) - -@mcp.tool() -def analysis_get_dataflow(address: str, direction: str = "forward", max_steps: int = 50, port: int = None) -> dict: - """Perform data flow analysis from an address - - Args: - address: Starting address in hex format - direction: "forward" or "backward" (default: "forward") - max_steps: Maximum analysis steps (default: 50) - port: Specific Ghidra instance port (optional) - - Returns: - dict: Data flow analysis results - """ - if not address: - return { - "success": False, - "error": "Address parameter is required", - "timestamp": int(time.time() * 1000) - } - - port = _get_instance_port(port) - - params = { - "address": address, - "direction": direction, - "max_steps": max_steps - } - - response = safe_get(port, "analysis/dataflow", params) - return simplify_response(response) - -# ================= Startup ================= - -if __name__ == "__main__": - register_instance(DEFAULT_GHIDRA_PORT, - f"http://{ghidra_host}:{DEFAULT_GHIDRA_PORT}") - - # Use quick discovery on startup - _discover_instances(QUICK_DISCOVERY_RANGE) - - # Start background discovery thread - discovery_thread = threading.Thread( - target=periodic_discovery, - daemon=True, - name="GhydraMCP-Discovery" - ) - discovery_thread.start() - - signal.signal(signal.SIGINT, handle_sigint) - mcp.run(transport="stdio") \ No newline at end of file diff --git a/refactoring_proposal.md b/refactoring_proposal.md deleted file mode 100644 index 43ac518..0000000 --- a/refactoring_proposal.md +++ /dev/null @@ -1,261 +0,0 @@ -# GhydraMCP Bridge Refactoring Proposal - -## Current Issues - -The current bridge implementation exposes all functionality as MCP tools, which creates several problems: - -1. **Discoverability**: With dozens of tool functions, it's difficult for AI agents to identify the correct tool to use for a specific task. - -2. **Consistency**: The API surface is large and not organized by conceptual resources, making it harder to understand what's related. - -3. **Context Loading**: Many operations require repeated loading of program information that could be provided more efficiently as resources. - -4. **Default Selection**: The current approach requires explicit port selection for each operation, instead of following a "current working instance" pattern. - -## Proposed MCP-Oriented Refactoring - -Restructure the bridge to follow MCP patterns more closely: - -### 1. Resources (for Context Loading) - -Resources provide information that can be loaded directly into the LLM's context. - -```python -@mcp.resource() -def ghidra_instance(port: int = None) -> dict: - """Get information about a Ghidra instance or the current working instance - - Args: - port: Specific Ghidra instance port (optional, uses current if omitted) - - Returns: - dict: Detailed information about the Ghidra instance and loaded program - """ - # Implementation that gets instance info and the current program details - # from the currently selected "working" instance or a specific port -``` - -```python -@mcp.resource() -def decompiled_function(name: str = None, address: str = None) -> str: - """Get decompiled C code for a function - - Args: - name: Function name (mutually exclusive with address) - address: Function address in hex format (mutually exclusive with name) - - Returns: - str: The decompiled C code as a string - """ - # Implementation that only returns the decompiled text directly -``` - -```python -@mcp.resource() -def function_info(name: str = None, address: str = None) -> dict: - """Get detailed information about a function - - Args: - name: Function name (mutually exclusive with address) - address: Function address in hex format (mutually exclusive with name) - - Returns: - dict: Complete function information including signature, parameters, etc. - """ - # Implementation that returns detailed function information -``` - -```python -@mcp.resource() -def disassembly(name: str = None, address: str = None) -> str: - """Get disassembled instructions for a function - - Args: - name: Function name (mutually exclusive with address) - address: Function address in hex format (mutually exclusive with name) - - Returns: - str: Formatted disassembly listing as a string - """ - # Implementation that returns formatted text disassembly -``` - -### 2. Prompts (for Interaction Patterns) - -Prompts define reusable templates for LLM interactions, making common workflows easier. - -```python -@mcp.prompt("analyze_function") -def analyze_function_prompt(name: str = None, address: str = None): - """A prompt that guides the LLM through analyzing a function's purpose - - Args: - name: Function name (mutually exclusive with address) - address: Function address in hex format (mutually exclusive with name) - """ - # Implementation returns a prompt template with decompiled code and disassembly - # that helps the LLM systematically analyze a function - return { - "prompt": f""" - Analyze the following function: {name or address} - - Decompiled code: - ```c - {decompiled_function(name=name, address=address)} - ``` - - Disassembly: - ``` - {disassembly(name=name, address=address)} - ``` - - 1. What is the purpose of this function? - 2. What are the key parameters and their uses? - 3. What are the return values and their meanings? - 4. Are there any security concerns in this implementation? - 5. Describe the algorithm or process being implemented. - """, - "context": { - "function_info": function_info(name=name, address=address) - } - } -``` - -```python -@mcp.prompt("identify_vulnerabilities") -def identify_vulnerabilities_prompt(name: str = None, address: str = None): - """A prompt that helps the LLM identify potential vulnerabilities in a function - - Args: - name: Function name (mutually exclusive with address) - address: Function address in hex format (mutually exclusive with name) - """ - # Implementation returns a prompt focused on finding security issues -``` - -### 3. Tools (for Function Selection) - -Tools are organized by domain concepts rather than just mirroring the low-level API. - -```python -@mcp.tool_group("instances") -class InstanceTools: - @mcp.tool() - def list() -> dict: - """List all active Ghidra instances""" - return list_instances() - - @mcp.tool() - def discover() -> dict: - """Discover available Ghidra instances""" - return discover_instances() - - @mcp.tool() - def register(port: int, url: str = None) -> str: - """Register a new Ghidra instance""" - return register_instance(port, url) - - @mcp.tool() - def use(port: int) -> str: - """Set the current working Ghidra instance""" - # Implementation that sets the default instance - global current_instance_port - current_instance_port = port - return f"Now using Ghidra instance on port {port}" -``` - -```python -@mcp.tool_group("functions") -class FunctionTools: - @mcp.tool() - def list(offset: int = 0, limit: int = 100, **filters) -> dict: - """List functions with filtering and pagination""" - # Implementation that uses the current instance - return list_functions(port=current_instance_port, offset=offset, limit=limit, **filters) - - @mcp.tool() - def get(name: str = None, address: str = None) -> dict: - """Get detailed information about a function""" - return get_function(port=current_instance_port, name=name, address=address) - - @mcp.tool() - def create(address: str) -> dict: - """Create a new function at the specified address""" - return create_function(port=current_instance_port, address=address) - - @mcp.tool() - def rename(name: str = None, address: str = None, new_name: str = "") -> dict: - """Rename a function""" - return rename_function(port=current_instance_port, - name=name, address=address, new_name=new_name) - - @mcp.tool() - def set_signature(name: str = None, address: str = None, signature: str = "") -> dict: - """Set a function's signature/prototype""" - return set_function_signature(port=current_instance_port, - name=name, address=address, signature=signature) -``` - -Similar tool groups would be created for: -- `data`: Data manipulation tools -- `memory`: Memory reading/writing tools -- `analysis`: Program analysis tools -- `xrefs`: Cross-reference navigation tools -- `symbols`: Symbol management tools -- `variables`: Variable manipulation tools - -### 4. Simplified Instance Management - -Add a "current working instance" pattern: - -```python -# Global state for the current instance -current_instance_port = DEFAULT_GHIDRA_PORT - -# Helper function to get the current instance or validate a specific port -def _get_instance_port(port=None): - port = port or current_instance_port - # Validate that the instance exists and is active - if port not in active_instances: - # Try to register it if not found - register_instance(port) - if port not in active_instances: - raise ValueError(f"No active Ghidra instance on port {port}") - return port - -# All tools would use this helper, falling back to the current instance if no port is specified -def read_memory(address: str, length: int = 16, format: str = "hex", port: int = None) -> dict: - """Read bytes from memory - - Args: - address: Memory address in hex format - length: Number of bytes to read (default: 16) - format: Output format (default: "hex") - port: Specific Ghidra instance port (optional, uses current if omitted) - - Returns: - dict: Memory content in the requested format - """ - port = _get_instance_port(port) - # Rest of implementation... -``` - -## Migration Strategy - -1. Create a new MCP class structure in a separate file -2. Implement resource loaders for key items (functions, data, memory regions) -3. Implement prompt templates for common tasks -4. Organize tools into logical groups by domain concept -5. Add a current instance selection mechanism -6. Update documentation with clear examples of the new patterns -7. Create backward compatibility shims if needed - -## Benefits of This Approach - -1. **Better Discoverability**: Logical grouping helps agents find the right tool -2. **Context Efficiency**: Resources load just what's needed without extra metadata -3. **Streamlined Interaction**: Tools follow consistent patterns with sensible defaults -4. **Prompt Templates**: Common patterns are codified in reusable prompts -5. **More LLM-friendly**: Outputs optimized for consumption by language models - -The refactored API would be easier to use, more efficient, and better aligned with MCP best practices, while maintaining all the current functionality. \ No newline at end of file diff --git a/refactoring_sample.py b/refactoring_sample.py deleted file mode 100644 index 36faada..0000000 --- a/refactoring_sample.py +++ /dev/null @@ -1,1286 +0,0 @@ -# /// script -# requires-python = ">=3.11" -# dependencies = [ -# "mcp==1.6.0", -# "requests==2.32.3", -# ] -# /// -# GhydraMCP Bridge for Ghidra HATEOAS API - Refactored for MCP optimization -# This provides a sample implementation of the refactoring proposal - -import os -import signal -import sys -import threading -import time -from threading import Lock -from typing import Dict, List, Optional, Union, Any -from urllib.parse import quote, urlencode - -import requests -from mcp.server.fastmcp import FastMCP - -# ================= Core Infrastructure ================= - -# Allowed origins for CORS/CSRF protection -ALLOWED_ORIGINS = os.environ.get( - "GHIDRA_ALLOWED_ORIGINS", "http://localhost").split(",") - -# Track active Ghidra instances (port -> info dict) -active_instances: Dict[int, dict] = {} -instances_lock = Lock() -DEFAULT_GHIDRA_PORT = 8192 -DEFAULT_GHIDRA_HOST = "localhost" -# Port ranges for scanning -QUICK_DISCOVERY_RANGE = range(DEFAULT_GHIDRA_PORT, DEFAULT_GHIDRA_PORT+10) -FULL_DISCOVERY_RANGE = range(DEFAULT_GHIDRA_PORT, DEFAULT_GHIDRA_PORT+20) - -# Version information -BRIDGE_VERSION = "v2.0.0-beta.1" -REQUIRED_API_VERSION = 2 - -# Global state for the current instance -current_instance_port = DEFAULT_GHIDRA_PORT - -instructions = """ -GhydraMCP allows interacting with multiple Ghidra SRE instances. Ghidra SRE is a tool for reverse engineering and analyzing binaries, e.g. malware. - -First, run `instances.discover()` to find open Ghidra instances. Then use `instances.use(port)` to set your working instance. -""" - -mcp = FastMCP("GhydraMCP", version=BRIDGE_VERSION, instructions=instructions) - -ghidra_host = os.environ.get("GHIDRA_HYDRA_HOST", DEFAULT_GHIDRA_HOST) - -# Helper function to get the current instance or validate a specific port -def _get_instance_port(port=None): - """Internal helper to get the current instance port or validate a specific port""" - port = port or current_instance_port - # Validate that the instance exists and is active - if port not in active_instances: - # Try to register it if not found - register_instance(port) - if port not in active_instances: - raise ValueError(f"No active Ghidra instance on port {port}") - return port - -# HTTP request helpers -def get_instance_url(port: int) -> str: - """Get URL for a Ghidra instance by port""" - with instances_lock: - if port in active_instances: - return active_instances[port]["url"] - - if 8192 <= port <= 65535: - register_instance(port) - if port in active_instances: - return active_instances[port]["url"] - - return f"http://{ghidra_host}:{port}" - -def validate_origin(headers: dict) -> bool: - """Validate request origin against allowed origins""" - origin = headers.get("Origin") - if not origin: - # No origin header - allow (browser same-origin policy applies) - return True - - # Parse origin to get scheme+hostname - try: - parsed = urlparse(origin) - origin_base = f"{parsed.scheme}://{parsed.hostname}" - if parsed.port: - origin_base += f":{parsed.port}" - except: - return False - - return origin_base in ALLOWED_ORIGINS - -def _make_request(method: str, port: int, endpoint: str, params: dict = None, - json_data: dict = None, data: str = None, - headers: dict = None) -> dict: - """Internal helper to make HTTP requests and handle common errors.""" - url = f"{get_instance_url(port)}/{endpoint}" - - # Set up headers according to HATEOAS API expected format - request_headers = { - 'Accept': 'application/json', - 'X-Request-ID': f"mcp-bridge-{int(time.time() * 1000)}" - } - - if headers: - request_headers.update(headers) - - is_state_changing = method.upper() in ["POST", "PUT", "PATCH", "DELETE"] - if is_state_changing: - check_headers = json_data.get("headers", {}) if isinstance( - json_data, dict) else (headers or {}) - if not validate_origin(check_headers): - return { - "success": False, - "error": { - "code": "ORIGIN_NOT_ALLOWED", - "message": "Origin not allowed for state-changing request" - }, - "status_code": 403, - "timestamp": int(time.time() * 1000) - } - if json_data is not None: - request_headers['Content-Type'] = 'application/json' - elif data is not None: - request_headers['Content-Type'] = 'text/plain' - - try: - response = requests.request( - method, - url, - params=params, - json=json_data, - data=data, - headers=request_headers, - timeout=10 - ) - - try: - parsed_json = response.json() - - # Add timestamp if not present - if isinstance(parsed_json, dict) and "timestamp" not in parsed_json: - parsed_json["timestamp"] = int(time.time() * 1000) - - # Check for HATEOAS compliant error response format and reformat if needed - if not response.ok and isinstance(parsed_json, dict) and "success" in parsed_json and not parsed_json["success"]: - # Check if error is in the expected HATEOAS format - if "error" in parsed_json and not isinstance(parsed_json["error"], dict): - # Convert string error to the proper format - error_message = parsed_json["error"] - parsed_json["error"] = { - "code": f"HTTP_{response.status_code}", - "message": error_message - } - - return parsed_json - - except ValueError: - if response.ok: - return { - "success": False, - "error": { - "code": "NON_JSON_RESPONSE", - "message": "Received non-JSON success response from Ghidra plugin" - }, - "status_code": response.status_code, - "response_text": response.text[:500], - "timestamp": int(time.time() * 1000) - } - else: - return { - "success": False, - "error": { - "code": f"HTTP_{response.status_code}", - "message": f"Non-JSON error response: {response.text[:100]}..." - }, - "status_code": response.status_code, - "response_text": response.text[:500], - "timestamp": int(time.time() * 1000) - } - - except requests.exceptions.Timeout: - return { - "success": False, - "error": { - "code": "REQUEST_TIMEOUT", - "message": "Request timed out" - }, - "status_code": 408, - "timestamp": int(time.time() * 1000) - } - except requests.exceptions.ConnectionError: - return { - "success": False, - "error": { - "code": "CONNECTION_ERROR", - "message": f"Failed to connect to Ghidra instance at {url}" - }, - "status_code": 503, - "timestamp": int(time.time() * 1000) - } - except Exception as e: - return { - "success": False, - "error": { - "code": "UNEXPECTED_ERROR", - "message": f"An unexpected error occurred: {str(e)}" - }, - "exception": e.__class__.__name__, - "timestamp": int(time.time() * 1000) - } - -def safe_get(port: int, endpoint: str, params: dict = None) -> dict: - """Make GET request to Ghidra instance""" - return _make_request("GET", port, endpoint, params=params) - -def safe_put(port: int, endpoint: str, data: dict) -> dict: - """Make PUT request to Ghidra instance with JSON payload""" - headers = data.pop("headers", None) if isinstance(data, dict) else None - return _make_request("PUT", port, endpoint, json_data=data, headers=headers) - -def safe_post(port: int, endpoint: str, data: Union[dict, str]) -> dict: - """Perform a POST request to a specific Ghidra instance with JSON or text payload""" - headers = None - json_payload = None - text_payload = None - - if isinstance(data, dict): - headers = data.pop("headers", None) - json_payload = data - else: - text_payload = data - - return _make_request("POST", port, endpoint, json_data=json_payload, data=text_payload, headers=headers) - -def safe_patch(port: int, endpoint: str, data: dict) -> dict: - """Perform a PATCH request to a specific Ghidra instance with JSON payload""" - headers = data.pop("headers", None) if isinstance(data, dict) else None - return _make_request("PATCH", port, endpoint, json_data=data, headers=headers) - -def safe_delete(port: int, endpoint: str) -> dict: - """Perform a DELETE request to a specific Ghidra instance""" - return _make_request("DELETE", port, endpoint) - -def simplify_response(response: dict) -> dict: - """ - Simplify HATEOAS response data for easier AI agent consumption - - Removes _links from result entries - - Flattens nested structures when appropriate - - Preserves important metadata - - Converts structured data like disassembly to text for easier consumption - """ - if not isinstance(response, dict): - return response - - # Make a copy to avoid modifying the original - result = response.copy() - - # Store API response metadata - api_metadata = {} - for key in ["id", "instance", "timestamp", "size", "offset", "limit"]: - if key in result: - api_metadata[key] = result.get(key) - - # Simplify the main result data if present - if "result" in result: - # Handle array results - if isinstance(result["result"], list): - simplified_items = [] - for item in result["result"]: - if isinstance(item, dict): - # Store but remove HATEOAS links from individual items - item_copy = item.copy() - links = item_copy.pop("_links", None) - - # Optionally store direct href links as more accessible properties - # This helps AI agents navigate the API without understanding HATEOAS - if isinstance(links, dict): - for link_name, link_data in links.items(): - if isinstance(link_data, dict) and "href" in link_data: - item_copy[f"{link_name}_url"] = link_data["href"] - - simplified_items.append(item_copy) - else: - simplified_items.append(item) - result["result"] = simplified_items - - # Handle object results - elif isinstance(result["result"], dict): - result_copy = result["result"].copy() - - # Store but remove links from result object - links = result_copy.pop("_links", None) - - # Add direct href links for easier navigation - if isinstance(links, dict): - for link_name, link_data in links.items(): - if isinstance(link_data, dict) and "href" in link_data: - result_copy[f"{link_name}_url"] = link_data["href"] - - # Special case for disassembly - convert to text for easier consumption - if "instructions" in result_copy and isinstance(result_copy["instructions"], list): - disasm_text = "" - for instr in result_copy["instructions"]: - if isinstance(instr, dict): - addr = instr.get("address", "") - mnemonic = instr.get("mnemonic", "") - operands = instr.get("operands", "") - bytes_str = instr.get("bytes", "") - - # Format: address: bytes mnemonic operands - disasm_text += f"{addr}: {bytes_str.ljust(10)} {mnemonic} {operands}\n" - - # Add the text representation while preserving the original structured data - result_copy["disassembly_text"] = disasm_text - - # Special case for decompiled code - make sure it's directly accessible - if "ccode" in result_copy: - result_copy["decompiled_text"] = result_copy["ccode"] - elif "decompiled" in result_copy: - result_copy["decompiled_text"] = result_copy["decompiled"] - - result["result"] = result_copy - - # Store but remove HATEOAS links from the top level - links = result.pop("_links", None) - - # Add direct href links in a more accessible format - if isinstance(links, dict): - api_links = {} - for link_name, link_data in links.items(): - if isinstance(link_data, dict) and "href" in link_data: - api_links[link_name] = link_data["href"] - - # Add simplified links - if api_links: - result["api_links"] = api_links - - # Restore API metadata - for key, value in api_metadata.items(): - if key not in result: - result[key] = value - - return result - -# ================= Legacy Instance Management ================= - -def register_instance(port: int, url: str = None) -> str: - """Register a new Ghidra instance - - Args: - port: Port number of the Ghidra instance - url: Optional URL if different from default http://host:port - - Returns: - str: Confirmation message or error - """ - if url is None: - url = f"http://{ghidra_host}:{port}" - - try: - # Check for HATEOAS API by checking plugin-version endpoint - test_url = f"{url}/plugin-version" - response = requests.get(test_url, timeout=2) - - if not response.ok: - return f"Error: Instance at {url} is not responding properly to HATEOAS API" - - project_info = {"url": url} - - try: - # Check plugin version to ensure compatibility - try: - version_data = response.json() - if "result" in version_data: - result = version_data["result"] - if isinstance(result, dict): - plugin_version = result.get("plugin_version", "") - api_version = result.get("api_version", 0) - - project_info["plugin_version"] = plugin_version - project_info["api_version"] = api_version - - # Verify API version compatibility - if api_version != REQUIRED_API_VERSION: - error_msg = f"API version mismatch: Plugin reports version {api_version}, but bridge requires version {REQUIRED_API_VERSION}" - print(error_msg, file=sys.stderr) - return error_msg - - print(f"Connected to Ghidra plugin version {plugin_version} with API version {api_version}") - except Exception as e: - print(f"Error parsing plugin version: {e}", file=sys.stderr) - - # Get program info from HATEOAS API - info_url = f"{url}/program" - - try: - info_response = requests.get(info_url, timeout=2) - if info_response.ok: - try: - info_data = info_response.json() - if "result" in info_data: - result = info_data["result"] - if isinstance(result, dict): - # Extract project and file from programId (format: "project:/file") - program_id = result.get("programId", "") - if ":" in program_id: - project_name, file_path = program_id.split(":", 1) - project_info["project"] = project_name - # Remove leading slash from file path if present - if file_path.startswith("/"): - file_path = file_path[1:] - project_info["path"] = file_path - - # Get file name directly from the result - project_info["file"] = result.get("name", "") - - # Get other metadata - project_info["language_id"] = result.get("languageId", "") - project_info["compiler_spec_id"] = result.get("compilerSpecId", "") - project_info["image_base"] = result.get("image_base", "") - - # Store _links from result for HATEOAS navigation - if "_links" in result: - project_info["_links"] = result.get("_links", {}) - except Exception as e: - print(f"Error parsing info endpoint: {e}", file=sys.stderr) - except Exception as e: - print(f"Error connecting to info endpoint: {e}", file=sys.stderr) - except Exception: - # Non-critical, continue with registration even if project info fails - pass - - with instances_lock: - active_instances[port] = project_info - - return f"Registered instance on port {port} at {url}" - except Exception as e: - return f"Error: Could not connect to instance at {url}: {str(e)}" - -def _discover_instances(port_range, host=None, timeout=0.5) -> dict: - """Internal function to discover Ghidra instances by scanning ports""" - found_instances = [] - scan_host = host if host is not None else ghidra_host - - for port in port_range: - if port in active_instances: - continue - - url = f"http://{scan_host}:{port}" - try: - # Try HATEOAS API via plugin-version endpoint - test_url = f"{url}/plugin-version" - response = requests.get(test_url, - headers={'Accept': 'application/json', - 'X-Request-ID': f"discovery-{int(time.time() * 1000)}"}, - timeout=timeout) - - if response.ok: - # Further validate it's a GhydraMCP instance by checking response format - try: - json_data = response.json() - if "success" in json_data and json_data["success"] and "result" in json_data: - # Looks like a valid HATEOAS API response - # Instead of relying only on register_instance, which already checks program info, - # extract additional information here for more detailed discovery results - result = register_instance(port, url) - - # Initialize report info - instance_info = { - "port": port, - "url": url - } - - # Extract version info for reporting - if isinstance(json_data["result"], dict): - instance_info["plugin_version"] = json_data["result"].get("plugin_version", "unknown") - instance_info["api_version"] = json_data["result"].get("api_version", "unknown") - else: - instance_info["plugin_version"] = "unknown" - instance_info["api_version"] = "unknown" - - # Include project details from registered instance in the report - if port in active_instances: - instance_info["project"] = active_instances[port].get("project", "") - instance_info["file"] = active_instances[port].get("file", "") - - instance_info["result"] = result - found_instances.append(instance_info) - except (ValueError, KeyError): - # Not a valid JSON response or missing expected keys - print(f"Port {port} returned non-HATEOAS response", file=sys.stderr) - continue - - except requests.exceptions.RequestException: - # Instance not available, just continue - continue - - return { - "found": len(found_instances), - "instances": found_instances - } - -def periodic_discovery(): - """Periodically discover new instances""" - while True: - try: - _discover_instances(FULL_DISCOVERY_RANGE, timeout=0.5) - - with instances_lock: - ports_to_remove = [] - for port, info in active_instances.items(): - url = info["url"] - try: - # Check HATEOAS API via plugin-version endpoint - response = requests.get(f"{url}/plugin-version", timeout=1) - if not response.ok: - ports_to_remove.append(port) - continue - - # Update program info if available (especially to get project name) - try: - info_url = f"{url}/program" - info_response = requests.get(info_url, timeout=1) - if info_response.ok: - try: - info_data = info_response.json() - if "result" in info_data: - result = info_data["result"] - if isinstance(result, dict): - # Extract project and file from programId (format: "project:/file") - program_id = result.get("programId", "") - if ":" in program_id: - project_name, file_path = program_id.split(":", 1) - info["project"] = project_name - # Remove leading slash from file path if present - if file_path.startswith("/"): - file_path = file_path[1:] - info["path"] = file_path - - # Get file name directly from the result - info["file"] = result.get("name", "") - - # Get other metadata - info["language_id"] = result.get("languageId", "") - info["compiler_spec_id"] = result.get("compilerSpecId", "") - info["image_base"] = result.get("image_base", "") - except Exception as e: - print(f"Error parsing info endpoint during discovery: {e}", file=sys.stderr) - except Exception: - # Non-critical, continue even if update fails - pass - - except requests.exceptions.RequestException: - ports_to_remove.append(port) - - for port in ports_to_remove: - del active_instances[port] - print(f"Removed unreachable instance on port {port}") - except Exception as e: - print(f"Error in periodic discovery: {e}") - - time.sleep(30) - -# ================= MCP Resources ================= -# Resources provide information that can be loaded directly into context -# They focus on data and minimize metadata - -@mcp.resource() -def ghidra_instance(port: int = None) -> dict: - """Get detailed information about a Ghidra instance and the loaded program - - Args: - port: Specific Ghidra instance port (optional, uses current if omitted) - - Returns: - dict: Detailed information about the Ghidra instance and loaded program - """ - port = _get_instance_port(port) - response = safe_get(port, "program") - - if not isinstance(response, dict) or not response.get("success", False): - return {"error": f"Unable to access Ghidra instance on port {port}"} - - # Extract only the most relevant information for the resource - result = response.get("result", {}) - - if not isinstance(result, dict): - return {"error": "Invalid response format from Ghidra instance"} - - instance_info = { - "port": port, - "url": get_instance_url(port), - "program_name": result.get("name", "unknown"), - "program_id": result.get("programId", "unknown"), - "language": result.get("languageId", "unknown"), - "compiler": result.get("compilerSpecId", "unknown"), - "base_address": result.get("imageBase", "0x0"), - "memory_size": result.get("memorySize", 0), - "analysis_complete": result.get("analysisComplete", False) - } - - # Add project information if available - if "project" in active_instances[port]: - instance_info["project"] = active_instances[port]["project"] - - return instance_info - -@mcp.resource() -def decompiled_function(name: str = None, address: str = None, port: int = None) -> str: - """Get decompiled C code for a function - - Args: - name: Function name (mutually exclusive with address) - address: Function address in hex format (mutually exclusive with address) - port: Specific Ghidra instance port (optional) - - Returns: - str: The decompiled C code as a string, or error message - """ - if not name and not address: - return "Error: Either name or address parameter is required" - - port = _get_instance_port(port) - - params = { - "syntax_tree": "false", - "style": "normalize" - } - - if address: - endpoint = f"functions/{address}/decompile" - else: - endpoint = f"functions/by-name/{quote(name)}/decompile" - - response = safe_get(port, endpoint, params) - simplified = simplify_response(response) - - # For a resource, we want to directly return just the decompiled code - if (not isinstance(simplified, dict) or - not simplified.get("success", False) or - "result" not in simplified): - error_message = "Error: Could not decompile function" - if isinstance(simplified, dict) and "error" in simplified: - if isinstance(simplified["error"], dict): - error_message = simplified["error"].get("message", error_message) - else: - error_message = str(simplified["error"]) - return error_message - - # Extract just the decompiled code text - result = simplified["result"] - - # Different endpoints may return the code in different fields, try all of them - if isinstance(result, dict): - for key in ["decompiled_text", "ccode", "decompiled"]: - if key in result: - return result[key] - - return "Error: Could not extract decompiled code from response" - -@mcp.resource() -def function_info(name: str = None, address: str = None, port: int = None) -> dict: - """Get detailed information about a function - - Args: - name: Function name (mutually exclusive with address) - address: Function address in hex format (mutually exclusive with address) - port: Specific Ghidra instance port (optional) - - Returns: - dict: Complete function information including signature, parameters, etc. - """ - if not name and not address: - return {"error": "Either name or address parameter is required"} - - port = _get_instance_port(port) - - if address: - endpoint = f"functions/{address}" - else: - endpoint = f"functions/by-name/{quote(name)}" - - response = safe_get(port, endpoint) - simplified = simplify_response(response) - - if (not isinstance(simplified, dict) or - not simplified.get("success", False) or - "result" not in simplified): - error = {"error": "Could not get function information"} - if isinstance(simplified, dict) and "error" in simplified: - error["error_details"] = simplified["error"] - return error - - # Return just the function data without API metadata - return simplified["result"] - -@mcp.resource() -def disassembly(name: str = None, address: str = None, port: int = None) -> str: - """Get disassembled instructions for a function - - Args: - name: Function name (mutually exclusive with address) - address: Function address in hex format (mutually exclusive with address) - port: Specific Ghidra instance port (optional) - - Returns: - str: Formatted disassembly listing as a string - """ - if not name and not address: - return "Error: Either name or address parameter is required" - - port = _get_instance_port(port) - - if address: - endpoint = f"functions/{address}/disassembly" - else: - endpoint = f"functions/by-name/{quote(name)}/disassembly" - - response = safe_get(port, endpoint) - simplified = simplify_response(response) - - if (not isinstance(simplified, dict) or - not simplified.get("success", False) or - "result" not in simplified): - error_message = "Error: Could not get disassembly" - if isinstance(simplified, dict) and "error" in simplified: - if isinstance(simplified["error"], dict): - error_message = simplified["error"].get("message", error_message) - else: - error_message = str(simplified["error"]) - return error_message - - # For a resource, we want to directly return just the disassembly text - result = simplified["result"] - - # Check if we have a disassembly_text field already - if isinstance(result, dict) and "disassembly_text" in result: - return result["disassembly_text"] - - # Otherwise if we have raw instructions, format them ourselves - if isinstance(result, dict) and "instructions" in result and isinstance(result["instructions"], list): - disasm_text = "" - for instr in result["instructions"]: - if isinstance(instr, dict): - addr = instr.get("address", "") - mnemonic = instr.get("mnemonic", "") - operands = instr.get("operands", "") - bytes_str = instr.get("bytes", "") - - # Format: address: bytes mnemonic operands - disasm_text += f"{addr}: {bytes_str.ljust(10)} {mnemonic} {operands}\n" - - return disasm_text - - # If we have a direct disassembly field, try that as well - if isinstance(result, dict) and "disassembly" in result: - return result["disassembly"] - - return "Error: Could not extract disassembly from response" - -# ================= MCP Prompts ================= -# Prompts define reusable templates for LLM interactions - -@mcp.prompt("analyze_function") -def analyze_function_prompt(name: str = None, address: str = None, port: int = None): - """A prompt to guide the LLM through analyzing a function - - Args: - name: Function name (mutually exclusive with address) - address: Function address in hex format (mutually exclusive with address) - port: Specific Ghidra instance port (optional) - """ - port = _get_instance_port(port) - - # Get function name if only address is provided - if address and not name: - fn_info = function_info(address=address, port=port) - if isinstance(fn_info, dict) and "name" in fn_info: - name = fn_info["name"] - - # Create the template that guides analysis - return { - "prompt": f""" - Analyze the following function: {name or address} - - Decompiled code: - ```c - {decompiled_function(name=name, address=address, port=port)} - ``` - - Disassembly: - ``` - {disassembly(name=name, address=address, port=port)} - ``` - - 1. What is the purpose of this function? - 2. What are the key parameters and their uses? - 3. What are the return values and their meanings? - 4. Are there any security concerns in this implementation? - 5. Describe the algorithm or process being implemented. - """, - "context": { - "function_info": function_info(name=name, address=address, port=port) - } - } - -@mcp.prompt("identify_vulnerabilities") -def identify_vulnerabilities_prompt(name: str = None, address: str = None, port: int = None): - """A prompt to help identify potential vulnerabilities in a function - - Args: - name: Function name (mutually exclusive with address) - address: Function address in hex format (mutually exclusive with address) - port: Specific Ghidra instance port (optional) - """ - port = _get_instance_port(port) - - # Get function name if only address is provided - if address and not name: - fn_info = function_info(address=address, port=port) - if isinstance(fn_info, dict) and "name" in fn_info: - name = fn_info["name"] - - # Create the template focused on security analysis - return { - "prompt": f""" - Analyze the following function for security vulnerabilities: {name or address} - - Decompiled code: - ```c - {decompiled_function(name=name, address=address, port=port)} - ``` - - Look for these vulnerability types: - 1. Buffer overflows or underflows - 2. Integer overflow/underflow - 3. Use-after-free or double-free bugs - 4. Format string vulnerabilities - 5. Missing bounds checks - 6. Insecure memory operations - 7. Race conditions or timing issues - 8. Input validation problems - - For each potential vulnerability: - - Describe the vulnerability and where it occurs - - Explain the security impact - - Suggest how it could be exploited - - Recommend a fix - """, - "context": { - "function_info": function_info(name=name, address=address, port=port), - "disassembly": disassembly(name=name, address=address, port=port) - } - } - -# ================= MCP Tool Groups ================= -# Tools are organized into logical domains - -@mcp.tool_group("instances") -class InstanceTools: - """Tools for managing Ghidra instances""" - - @mcp.tool() - def list() -> dict: - """List all active Ghidra instances""" - with instances_lock: - return { - "instances": [ - { - "port": port, - "url": info["url"], - "project": info.get("project", ""), - "file": info.get("file", "") - } - for port, info in active_instances.items() - ] - } - - @mcp.tool() - def discover(host: str = None) -> dict: - """Discover available Ghidra instances by scanning ports - - Args: - host: Optional host to scan (default: configured ghidra_host) - - Returns: - dict: Contains 'found' count and 'instances' list with discovery results - """ - return _discover_instances(QUICK_DISCOVERY_RANGE, host=host, timeout=0.5) - - @mcp.tool() - def register(port: int, url: str = None) -> str: - """Register a new Ghidra instance - - Args: - port: Port number of the Ghidra instance - url: Optional URL if different from default http://host:port - - Returns: - str: Confirmation message or error - """ - return register_instance(port, url) - - @mcp.tool() - def unregister(port: int) -> str: - """Unregister a Ghidra instance - - Args: - port: Port number of the instance to unregister - - Returns: - str: Confirmation message or error - """ - with instances_lock: - if port in active_instances: - del active_instances[port] - return f"Unregistered instance on port {port}" - return f"No instance found on port {port}" - - @mcp.tool() - def use(port: int) -> str: - """Set the current working Ghidra instance - - Args: - port: Port number of the instance to use - - Returns: - str: Confirmation message or error - """ - global current_instance_port - - # First validate that the instance exists and is active - if port not in active_instances: - # Try to register it if not found - register_instance(port) - if port not in active_instances: - return f"Error: No active Ghidra instance found on port {port}" - - # Set as current instance - current_instance_port = port - - # Return information about the selected instance - with instances_lock: - info = active_instances[port] - program = info.get("file", "unknown program") - project = info.get("project", "unknown project") - return f"Now using Ghidra instance on port {port} with {program} in project {project}" - - @mcp.tool() - def current() -> dict: - """Get information about the current working Ghidra instance - - Returns: - dict: Details about the current instance and program - """ - return ghidra_instance(port=current_instance_port) - -@mcp.tool_group("functions") -class FunctionTools: - """Tools for working with functions""" - - @mcp.tool() - def list(offset: int = 0, limit: int = 100, - name_contains: str = None, - name_matches_regex: str = None, - port: int = None) -> dict: - """List functions with filtering and pagination - - Args: - offset: Pagination offset (default: 0) - limit: Maximum items to return (default: 100) - name_contains: Substring name filter (case-insensitive) - name_matches_regex: Regex name filter - port: Specific Ghidra instance port (optional) - - Returns: - dict: List of functions with pagination information - """ - port = _get_instance_port(port) - - params = { - "offset": offset, - "limit": limit - } - if name_contains: - params["name_contains"] = name_contains - if name_matches_regex: - params["name_matches_regex"] = name_matches_regex - - response = safe_get(port, "functions", params) - simplified = simplify_response(response) - - # Ensure we maintain pagination metadata - if isinstance(simplified, dict) and "error" not in simplified: - simplified.setdefault("size", len(simplified.get("result", []))) - simplified.setdefault("offset", offset) - simplified.setdefault("limit", limit) - - return simplified - - @mcp.tool() - def get(name: str = None, address: str = None, port: int = None) -> dict: - """Get detailed information about a function - - Args: - name: Function name (mutually exclusive with address) - address: Function address in hex format (mutually exclusive with name) - port: Specific Ghidra instance port (optional) - - Returns: - dict: Detailed function information - """ - if not name and not address: - return { - "success": False, - "error": { - "code": "MISSING_PARAMETER", - "message": "Either name or address parameter is required" - }, - "timestamp": int(time.time() * 1000) - } - - port = _get_instance_port(port) - - if address: - endpoint = f"functions/{address}" - else: - endpoint = f"functions/by-name/{quote(name)}" - - response = safe_get(port, endpoint) - return simplify_response(response) - - @mcp.tool() - def decompile(name: str = None, address: str = None, - syntax_tree: bool = False, style: str = "normalize", - port: int = None) -> dict: - """Get decompiled code for a function - - Args: - name: Function name (mutually exclusive with address) - address: Function address in hex format (mutually exclusive with name) - syntax_tree: Include syntax tree (default: False) - style: Decompiler style (default: "normalize") - port: Specific Ghidra instance port (optional) - - Returns: - dict: Contains function information and decompiled code - """ - if not name and not address: - return { - "success": False, - "error": { - "code": "MISSING_PARAMETER", - "message": "Either name or address parameter is required" - }, - "timestamp": int(time.time() * 1000) - } - - port = _get_instance_port(port) - - params = { - "syntax_tree": str(syntax_tree).lower(), - "style": style - } - - if address: - endpoint = f"functions/{address}/decompile" - else: - endpoint = f"functions/by-name/{quote(name)}/decompile" - - response = safe_get(port, endpoint, params) - simplified = simplify_response(response) - - # For AI consumption, make the decompiled code more directly accessible - if "result" in simplified and isinstance(simplified["result"], dict): - if "decompiled" in simplified["result"]: - simplified["decompiled_code"] = simplified["result"]["decompiled"] - elif "ccode" in simplified["result"]: - simplified["decompiled_code"] = simplified["result"]["ccode"] - elif "decompiled_text" in simplified["result"]: - simplified["decompiled_code"] = simplified["result"]["decompiled_text"] - - return simplified - - @mcp.tool() - def disassemble(name: str = None, address: str = None, port: int = None) -> dict: - """Get disassembly for a function - - Args: - name: Function name (mutually exclusive with address) - address: Function address in hex format (mutually exclusive with name) - port: Specific Ghidra instance port (optional) - - Returns: - dict: Contains function information and disassembly text - """ - if not name and not address: - return { - "success": False, - "error": { - "code": "MISSING_PARAMETER", - "message": "Either name or address parameter is required" - }, - "timestamp": int(time.time() * 1000) - } - - port = _get_instance_port(port) - - if address: - endpoint = f"functions/{address}/disassembly" - else: - endpoint = f"functions/by-name/{quote(name)}/disassembly" - - response = safe_get(port, endpoint) - return simplify_response(response) - - @mcp.tool() - def create(address: str, port: int = None) -> dict: - """Create a new function at the specified address - - Args: - address: Memory address in hex format where function starts - port: Specific Ghidra instance port (optional) - - Returns: - dict: Operation result with the created function information - """ - if not address: - return { - "success": False, - "error": { - "code": "MISSING_PARAMETER", - "message": "Address parameter is required" - }, - "timestamp": int(time.time() * 1000) - } - - port = _get_instance_port(port) - - payload = { - "address": address - } - - response = safe_post(port, "functions", payload) - return simplify_response(response) - - @mcp.tool() - def rename(old_name: str = None, address: str = None, new_name: str = "", port: int = None) -> dict: - """Rename a function - - Args: - old_name: Current function name (mutually exclusive with address) - address: Function address in hex format (mutually exclusive with name) - new_name: New function name - port: Specific Ghidra instance port (optional) - - Returns: - dict: Operation result with the updated function information - """ - if not (old_name or address) or not new_name: - return { - "success": False, - "error": { - "code": "MISSING_PARAMETER", - "message": "Either old_name or address, and new_name parameters are required" - }, - "timestamp": int(time.time() * 1000) - } - - port = _get_instance_port(port) - - payload = { - "name": new_name - } - - if address: - endpoint = f"functions/{address}" - else: - endpoint = f"functions/by-name/{quote(old_name)}" - - response = safe_patch(port, endpoint, payload) - return simplify_response(response) - - @mcp.tool() - def set_signature(name: str = None, address: str = None, signature: str = "", port: int = None) -> dict: - """Set function signature/prototype - - Args: - name: Function name (mutually exclusive with address) - address: Function address in hex format (mutually exclusive with name) - signature: New function signature (e.g., "int func(char *data, int size)") - port: Specific Ghidra instance port (optional) - - Returns: - dict: Operation result with the updated function information - """ - if not (name or address) or not signature: - return { - "success": False, - "error": { - "code": "MISSING_PARAMETER", - "message": "Either name or address, and signature parameters are required" - }, - "timestamp": int(time.time() * 1000) - } - - port = _get_instance_port(port) - - payload = { - "signature": signature - } - - if address: - endpoint = f"functions/{address}" - else: - endpoint = f"functions/by-name/{quote(name)}" - - response = safe_patch(port, endpoint, payload) - return simplify_response(response) - - @mcp.tool() - def get_variables(name: str = None, address: str = None, port: int = None) -> dict: - """Get variables for a function - - Args: - name: Function name (mutually exclusive with address) - address: Function address in hex format (mutually exclusive with name) - port: Specific Ghidra instance port (optional) - - Returns: - dict: Contains function information and list of variables - """ - if not name and not address: - return { - "success": False, - "error": { - "code": "MISSING_PARAMETER", - "message": "Either name or address parameter is required" - }, - "timestamp": int(time.time() * 1000) - } - - port = _get_instance_port(port) - - if address: - endpoint = f"functions/{address}/variables" - else: - endpoint = f"functions/by-name/{quote(name)}/variables" - - response = safe_get(port, endpoint) - return simplify_response(response) - -# Additional tool groups would be defined here for other domains: -# @mcp.tool_group("data") -# @mcp.tool_group("memory") -# @mcp.tool_group("analysis") -# @mcp.tool_group("xrefs") -# @mcp.tool_group("symbols") -# @mcp.tool_group("variables") - -# ================= Startup ================= - -if __name__ == "__main__": - register_instance(DEFAULT_GHIDRA_PORT, - f"http://{ghidra_host}:{DEFAULT_GHIDRA_PORT}") - - # Use quick discovery on startup - _discover_instances(QUICK_DISCOVERY_RANGE) - - # Start background discovery thread - discovery_thread = threading.Thread( - target=periodic_discovery, - daemon=True, - name="GhydraMCP-Discovery" - ) - discovery_thread.start() - - signal.signal(signal.SIGINT, handle_sigint) - mcp.run(transport="stdio") \ No newline at end of file diff --git a/run_tests.py b/run_tests.py index 7442c5d..ed29763 100644 --- a/run_tests.py +++ b/run_tests.py @@ -68,24 +68,73 @@ def run_mcp_bridge_tests(): print(f"Error running MCP bridge tests: {str(e)}") return False +def run_data_tests(): + """Run the data operations tests.""" + print_header("Running Data Operations Tests") + + try: + result = subprocess.run( + [sys.executable, "test_data_operations.py"], + capture_output=True, + text=True + ) + + if result.stdout: + print("STDOUT:") + print(result.stdout) + + if result.stderr: + print("STDERR:") + print(result.stderr) + + return result.returncode == 0 + except Exception as e: + print(f"Error running data operations tests: {str(e)}") + return False + +def run_comment_tests(): + """Run the comment functionality tests.""" + print_header("Running Comment Tests") + + try: + result = subprocess.run( + [sys.executable, "test_comments.py"], + capture_output=True, + text=True + ) + + if result.stdout: + print("STDOUT:") + print(result.stdout) + + if result.stderr: + print("STDERR:") + print(result.stderr) + + return result.returncode == 0 + except Exception as e: + print(f"Error running comment tests: {str(e)}") + return False + def run_all_tests(): """Run all tests""" print_header("GhydraMCP Test Suite") - # Run the HTTP API tests + # Run test suites http_api_success = run_http_api_tests() - - # Run the MCP bridge tests mcp_bridge_success = run_mcp_bridge_tests() + data_tests_success = run_data_tests() + comment_tests_success = run_comment_tests() # Print a summary print_header("Test Summary") print(f"HTTP API Tests: {'PASSED' if http_api_success else 'FAILED'}") print(f"MCP Bridge Tests: {'PASSED' if mcp_bridge_success else 'FAILED'}") - print(f"Overall: {'PASSED' if http_api_success and mcp_bridge_success else 'FAILED'}") + print(f"Data Operations Tests: {'PASSED' if data_tests_success else 'FAILED'}") + print(f"Comment Tests: {'PASSED' if comment_tests_success else 'FAILED'}") + print(f"Overall: {'PASSED' if (http_api_success and mcp_bridge_success and data_tests_success and comment_tests_success) else 'FAILED'}") - # Return True if all tests passed, False otherwise - return http_api_success and mcp_bridge_success + return http_api_success and mcp_bridge_success and data_tests_success and comment_tests_success if __name__ == "__main__": # Check if we have the required dependencies @@ -104,9 +153,15 @@ if __name__ == "__main__": elif sys.argv[1] == "--mcp": # Run only the MCP bridge tests success = run_mcp_bridge_tests() + elif sys.argv[1] == "--data": + # Run only the data operations tests + success = run_data_tests() + elif sys.argv[1] == "--comments": + # Run only the comment tests + success = run_comment_tests() else: print(f"Unknown argument: {sys.argv[1]}") - print("Usage: python run_tests.py [--http|--mcp]") + print("Usage: python run_tests.py [--http|--mcp|--data|--comments]") sys.exit(1) else: # Run all tests diff --git a/test_comments.py b/test_comments.py index e77a870..f38722e 100755 --- a/test_comments.py +++ b/test_comments.py @@ -1,6 +1,10 @@ #!/usr/bin/env python3 """ Test script for the comment functionality in GhydraMCP. + +Tests both HTTP API and MCP bridge interfaces for setting and retrieving +different types of comments in Ghidra, including plate, pre, post, EOL, +repeatable, and decompiler comments. """ import json import logging @@ -13,21 +17,23 @@ import requests from mcp.client.session import ClientSession from mcp.client.stdio import StdioServerParameters, stdio_client -# Setup logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger("comment_test") -# Direct HTTP test functions def test_http_api_comments(port=8192, address="08000200"): - """Test setting comments directly with HTTP API""" + """ + Test setting and retrieving comments using direct HTTP API. + + Args: + port: Ghidra HTTP API port + address: Memory address for comments + """ logger.info("===== Testing HTTP API Comments =====") base_url = f"http://localhost:{port}" - # Test each comment type comment_types = ["plate", "pre", "post", "eol", "repeatable"] - for i, comment_type in enumerate(comment_types): - # Set comment + for comment_type in comment_types: comment_text = f"TEST {comment_type.upper()} COMMENT {int(time.time())}" logger.info(f"Setting {comment_type} comment: {comment_text}") @@ -40,37 +46,42 @@ def test_http_api_comments(port=8192, address="08000200"): logger.info(f"Response: {r.text}") if r.status_code == 200: - # Get the comment back to verify r_get = requests.get(url, timeout=10) logger.info(f"GET Status code: {r_get.status_code}") logger.info(f"GET Response: {r_get.text}") except Exception as e: logger.error(f"Error setting {comment_type} comment: {e}") -# MCP Bridge test functions async def test_bridge_comments(): - """Test the bridge comment functionality""" + """ + Test MCP bridge comment functionality. + + Sets and clears both plate comments and decompiler comments using the + MCP bridge interface. + """ logger.info("===== Testing MCP Bridge Comments =====") - # Configure the server parameters server_parameters = StdioServerParameters( command=sys.executable, args=["bridge_mcp_hydra.py"], ) - # Connect to the bridge logger.info("Connecting to bridge...") async with stdio_client(server_parameters) as (read_stream, write_stream): - # Create a session logger.info("Creating session...") async with ClientSession(read_stream, write_stream) as session: - # Initialize the session logger.info("Initializing session...") - init_result = await session.initialize() + await session.initialize() + + # First set the current instance + logger.info("Setting current Ghidra instance...") + await session.call_tool( + "instances_use", + arguments={"port": 8192} + ) - # Get a function to test with logger.info("Getting current address...") - addr_result = await session.call_tool("get_current_address", arguments={"port": 8192}) + addr_result = await session.call_tool("ui_get_current_address") addr_data = json.loads(addr_result.content[0].text) if not addr_data.get("success", False): @@ -80,51 +91,46 @@ async def test_bridge_comments(): address = addr_data.get("result", {}).get("address", "08000200") logger.info(f"Using address: {address}") - # Test normal comment - logger.info("Testing set_comment with plate type...") + logger.info("Testing comments_set with plate type...") comment_text = f"MCP PLATE COMMENT {int(time.time())}" - result = await session.call_tool("set_comment", - arguments={"port": 8192, - "address": address, - "comment": comment_text, - "comment_type": "plate"}) - logger.info(f"set_comment result: {result}") + result = await session.call_tool("comments_set", + arguments={"address": address, + "comment": comment_text, + "comment_type": "plate"}) + logger.info(f"comments_set result: {result}") - # Test decompiler comment - logger.info("Testing set_decompiler_comment...") + logger.info("Testing functions_set_comment...") decompiler_comment = f"MCP DECOMPILER COMMENT {int(time.time())}" - decompile_result = await session.call_tool("set_decompiler_comment", - arguments={"port": 8192, - "address": address, - "comment": decompiler_comment}) - logger.info(f"set_decompiler_comment result: {decompile_result}") + decompile_result = await session.call_tool("functions_set_comment", + arguments={"address": address, + "comment": decompiler_comment}) + logger.info(f"functions_set_comment result: {decompile_result}") - # Wait a bit and then clear comments await anyio.sleep(5) - # Clear the comments logger.info("Clearing comments...") - await session.call_tool("set_comment", - arguments={"port": 8192, - "address": address, - "comment": "", - "comment_type": "plate"}) + await session.call_tool("comments_set", + arguments={"address": address, + "comment": "", + "comment_type": "plate"}) - await session.call_tool("set_decompiler_comment", - arguments={"port": 8192, - "address": address, - "comment": ""}) + await session.call_tool("functions_set_comment", + arguments={"address": address, + "comment": ""}) def main(): - """Main entry point""" + """ + Main entry point for comment tests. + + Runs both HTTP API and MCP bridge tests sequentially. + """ try: - # First test HTTP API directly test_http_api_comments() - - # Then test through MCP bridge anyio.run(test_bridge_comments) + logger.info("All comment tests completed successfully") + return True except Exception as e: - logger.error(f"Error: {e}") + logger.error(f"Error in comment tests: {e}") sys.exit(1) if __name__ == "__main__": diff --git a/test_data_create.py b/test_data_create.py deleted file mode 100644 index 455d67f..0000000 --- a/test_data_create.py +++ /dev/null @@ -1,135 +0,0 @@ -#!/usr/bin/env python3 -""" -Test script to verify the create_data function works properly. -""" -import json -import logging -import sys -import requests -import time - -# Setup logging -logging.basicConfig(level=logging.INFO) -logger = logging.getLogger("create_data_test") - -def wait_for_program_loaded(): - """Wait for a Ghidra program to be loaded.""" - for _ in range(10): # Try for ~20 seconds - try: - response = requests.get("http://localhost:8192/program") - if response.status_code == 200: - data = json.loads(response.text) - if data.get("success", False): - logger.info("Program loaded: " + data["result"]["name"]) - return True - except Exception as e: - logger.warning(f"Error checking program status: {e}") - - logger.info("Waiting for program to load...") - time.sleep(2) - - logger.error("Timed out waiting for program to load") - return False - -def test_create_data(): - """Test creating data at different addresses with different types.""" - # First wait for a program to be loaded - if not wait_for_program_loaded(): - logger.error("No program loaded, cannot test create_data") - return False - - # First get the memory map to find addresses where we can create data - try: - response = requests.get("http://localhost:8192/memory") - memory_info = json.loads(response.text) - - # Get valid addresses from an existing memory region - memory_blocks = memory_info.get("result", []) - - # Find a valid memory block - valid_addresses = [] - for block in memory_blocks: - if "start" in block and "name" in block: - # Get starting address of a RAM block - if "RAM" in block["name"].upper(): - # Use the first 10 bytes of this RAM block - addr_base = int(block["start"], 16) - for i in range(10): - valid_addresses.append(f"{addr_base + i:08x}") - break - - # If no RAM blocks, try any memory block - if not valid_addresses: - for block in memory_blocks: - if "start" in block: - # Use the first 10 bytes of this block - addr_base = int(block["start"], 16) - for i in range(10): - valid_addresses.append(f"{addr_base + i:08x}") - break - - # Fallback to known addresses if still nothing - if not valid_addresses: - valid_addresses = ["08000100", "08000104", "08000108", "0800010c", - "08000110", "08000114", "08000118", "0800011c"] - - logger.info(f"Will try using addresses: {valid_addresses[:3]}...") - addresses = valid_addresses - except Exception as e: - logger.error(f"Error getting memory map: {e}") - # Fallback to some addresses that might be valid - addresses = ["08000100", "08000104", "08000108", "0800010c", - "08000110", "08000114", "08000118", "0800011c"] - - # Try data types - types_to_try = ["uint32_t", "int", "float", "byte", "char", "word", "dword", "string"] - - success_count = 0 - - for i, data_type in enumerate(types_to_try): - address = addresses[i % len(addresses)] - logger.info(f"Testing data type: {data_type} at address {address}") - - # First try direct HTTP API - url = f"http://localhost:8192/data" - payload = { - "address": address, - "type": data_type, - "newName": f"TEST_{data_type.upper()}" - } - - # Add size for string types - if data_type.lower() == "string": - payload["size"] = 16 - - try: - response = requests.post(url, json=payload) - logger.info(f"HTTP API - Status: {response.status_code}") - logger.info(f"HTTP API - Response: {response.text}") - if response.status_code == 200 and json.loads(response.text).get("success", False): - success_count += 1 - logger.info(f"HTTP API - Success with data type {data_type}") - else: - logger.warning(f"HTTP API - Failed with data type {data_type}") - except Exception as e: - logger.error(f"HTTP API - Error: {e}") - - # Short delay between tests - time.sleep(0.5) - - return success_count > 0 - -def main(): - try: - result = test_create_data() - if result: - logger.info("Test successful!") - else: - logger.error("All test data types failed") - sys.exit(1) - except Exception as e: - logger.error(f"Unexpected error: {e}") - sys.exit(1) - -if __name__ == "__main__": - main() \ No newline at end of file diff --git a/test_data_delete.py b/test_data_delete.py deleted file mode 100644 index 69666ee..0000000 --- a/test_data_delete.py +++ /dev/null @@ -1,85 +0,0 @@ -#!/usr/bin/env python3 -""" -Test script to verify the delete_data functionality works properly. -""" -import json -import logging -import sys -import requests -import time - -# Setup logging -logging.basicConfig(level=logging.INFO) -logger = logging.getLogger("delete_data_test") - -def test_delete_data(): - """Test deleting data.""" - # First create data at a specific address - test_address = "08000100" # This should be a valid address in the memory map - test_type = "byte" - - # Step 1: Create some data - logger.info(f"Creating test data at {test_address}") - create_url = "http://localhost:8192/data" - create_payload = { - "address": test_address, - "type": test_type, - "newName": "TEST_DELETE_ME" - } - - try: - create_response = requests.post(create_url, json=create_payload) - logger.info(f"Create response: {create_response.status_code}") - logger.info(f"Create response: {create_response.text}") - - create_success = create_response.status_code == 200 and json.loads(create_response.text).get("success", False) - - if not create_success: - logger.warning("Failed to create test data, test may fail") - except Exception as e: - logger.error(f"Error creating test data: {e}") - - # Short delay - time.sleep(1) - - # Step 2: Delete the data - logger.info(f"Deleting data at {test_address}") - delete_url = "http://localhost:8192/data/delete" - delete_payload = { - "address": test_address, - "action": "delete" - } - - try: - delete_response = requests.post(delete_url, json=delete_payload) - logger.info(f"Delete response: {delete_response.status_code}") - logger.info(f"Delete response: {delete_response.text}") - - # Check if successful - if delete_response.status_code == 200: - response_data = json.loads(delete_response.text) - if response_data.get("success", False): - logger.info("Successfully deleted data!") - return True - - logger.warning("Failed to delete data") - return False - except Exception as e: - logger.error(f"Error deleting data: {e}") - return False - -def main(): - """Main entry point.""" - try: - result = test_delete_data() - if result: - logger.info("Test successful!") - else: - logger.error("Test failed") - sys.exit(1) - except Exception as e: - logger.error(f"Unexpected error: {e}") - sys.exit(1) - -if __name__ == "__main__": - main() \ No newline at end of file diff --git a/test_data_operations.py b/test_data_operations.py index 54fb9cd..9fe8d51 100755 --- a/test_data_operations.py +++ b/test_data_operations.py @@ -1,126 +1,417 @@ #!/usr/bin/env python3 """ -Test script for data operations in GhydraMCP bridge. -This script tests renaming and changing data types. +Comprehensive test script for data operations in GhydraMCP. + +This script tests all data-related operations including: +1. Creating data items with different types +2. Renaming data items +3. Updating data types +4. Deleting data items +5. Reading memory + +Tests are performed using both direct HTTP API and MCP bridge interfaces. """ import json import logging import sys import time +import requests +import anyio +from typing import Dict, Any from urllib.parse import quote -import anyio from mcp.client.session import ClientSession from mcp.client.stdio import StdioServerParameters, stdio_client -# Setup logging +# Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger("data_test") -async def test_data_operations(): - """Test data operations using the MCP client""" - # Configure the server parameters +# Configure default test values +GHIDRA_PORT = 8192 +DEFAULT_MEMORY_ADDRESS = "08000200" # Fallback test address + +def wait_for_program_loaded(port=GHIDRA_PORT, timeout=20): + """Wait for a Ghidra program to be loaded.""" + for _ in range(timeout // 2): + try: + response = requests.get(f"http://localhost:{port}/program") + if response.status_code == 200: + data = json.loads(response.text) + if data.get("success", False): + logger.info(f"Program loaded: {data['result']['name']}") + return True + except Exception as e: + logger.warning(f"Error checking program status: {e}") + + logger.info("Waiting for program to load...") + time.sleep(2) + + logger.error("Timed out waiting for program to load") + return False + +def find_valid_addresses(port=GHIDRA_PORT) -> list: + """Find valid memory addresses for testing by checking memory map.""" + try: + response = requests.get(f"http://localhost:{port}/memory") + memory_info = json.loads(response.text) + + memory_blocks = memory_info.get("result", []) + valid_addresses = [] + + # First try to find a RAM block + for block in memory_blocks: + if "start" in block and "name" in block and "RAM" in block["name"].upper(): + addr_base = int(block["start"], 16) + for i in range(10): + valid_addresses.append(f"{addr_base + i*4:08x}") + return valid_addresses + + # If no RAM blocks, try any memory block + for block in memory_blocks: + if "start" in block: + addr_base = int(block["start"], 16) + for i in range(10): + valid_addresses.append(f"{addr_base + i*4:08x}") + return valid_addresses + + except Exception as e: + logger.error(f"Error getting memory map: {e}") + + # Fallback addresses if cannot determine from memory map + return ["08000100", "08000104", "08000108", "0800010c", "08000110"] + +def test_http_data_create(): + """Test creating data items with different types using HTTP API.""" + if not wait_for_program_loaded(): + return False + + addresses = find_valid_addresses() + if not addresses: + logger.error("No valid addresses found for data creation test") + return False + + types_to_try = ["uint", "int", "uint *", "int *", "byte", "word", "dword", "pointer"] + success_count = 0 + + for i, data_type in enumerate(types_to_try): + address = addresses[i % len(addresses)] + logger.info(f"Testing data type: {data_type} at address {address}") + + url = f"http://localhost:{GHIDRA_PORT}/data" + payload = { + "address": address, + "type": data_type, + "newName": f"TEST_{data_type.upper()}" + } + + # Add size for string types + if data_type.lower() == "string": + payload["size"] = 16 + + try: + response = requests.post(url, json=payload) + logger.info(f"Status: {response.status_code}") + logger.info(f"Response: {response.text}") + if response.status_code == 200 and json.loads(response.text).get("success", False): + success_count += 1 + logger.info(f"Success with data type {data_type}") + except Exception as e: + logger.error(f"Error: {e}") + + time.sleep(0.5) + + return success_count > 0 + +def test_http_data_rename(): + """Test data rename operations using HTTP API.""" + addresses = find_valid_addresses() + if not addresses: + return False + + test_address = addresses[0] + test_name = f"TEST_RENAME_{int(time.time())}" + + # First create a data item to rename + create_url = f"http://localhost:{GHIDRA_PORT}/data" + create_payload = { + "address": test_address, + "type": "int", + "newName": "TEST_BEFORE_RENAME" + } + + try: + create_response = requests.post(create_url, json=create_payload) + if create_response.status_code != 200: + logger.warning("Failed to create test data for rename test") + return False + + # Rename the data + rename_payload = { + "address": test_address, + "newName": test_name + } + + rename_response = requests.post(create_url, json=rename_payload) + logger.info(f"Rename response: {rename_response.status_code}") + logger.info(f"Rename response: {rename_response.text}") + + return rename_response.status_code == 200 and json.loads(rename_response.text).get("success", False) + except Exception as e: + logger.error(f"Error in rename test: {e}") + return False + +def test_http_data_type_change(): + """Test changing data type using HTTP API.""" + addresses = find_valid_addresses() + if not addresses: + return False + + test_address = addresses[1] + + # First create a data item + create_url = f"http://localhost:{GHIDRA_PORT}/data" + create_payload = { + "address": test_address, + "type": "uint", + "newName": "TEST_TYPE_CHANGE" + } + + try: + create_response = requests.post(create_url, json=create_payload) + if create_response.status_code != 200: + logger.warning("Failed to create test data for type change test") + return False + + # Change the type + type_url = f"http://localhost:{GHIDRA_PORT}/data/type" + type_payload = { + "address": test_address, + "type": "byte" + } + + type_response = requests.post(type_url, json=type_payload) + logger.info(f"Type change response: {type_response.status_code}") + logger.info(f"Type change response: {type_response.text}") + + return type_response.status_code == 200 and json.loads(type_response.text).get("success", False) + except Exception as e: + logger.error(f"Error in type change test: {e}") + return False + +def test_http_data_delete(): + """Test deleting data using HTTP API.""" + addresses = find_valid_addresses() + if not addresses: + return False + + test_address = addresses[2] + + # First create a data item to delete + create_url = f"http://localhost:{GHIDRA_PORT}/data" + create_payload = { + "address": test_address, + "type": "int", + "newName": "TEST_DELETE_ME" + } + + try: + create_response = requests.post(create_url, json=create_payload) + if create_response.status_code != 200: + logger.warning("Failed to create test data for delete test") + return False + + # Delete the data + delete_url = f"http://localhost:{GHIDRA_PORT}/data/delete" + delete_payload = { + "address": test_address, + "action": "delete" + } + + delete_response = requests.post(delete_url, json=delete_payload) + logger.info(f"Delete response: {delete_response.status_code}") + logger.info(f"Delete response: {delete_response.text}") + + return delete_response.status_code == 200 and json.loads(delete_response.text).get("success", False) + except Exception as e: + logger.error(f"Error in delete test: {e}") + return False + +def test_http_combined_operations(): + """Test data operations that update both name and type together.""" + addresses = find_valid_addresses() + if not addresses: + return False + + test_address = addresses[3] + + # First create a data item + create_url = f"http://localhost:{GHIDRA_PORT}/data" + create_payload = { + "address": test_address, + "type": "int", + "newName": "TEST_COMBINED_ORIG" + } + + try: + create_response = requests.post(create_url, json=create_payload) + if create_response.status_code != 200: + logger.warning("Failed to create test data for combined update test") + return False + + # Update both name and type in one operation + update_url = f"http://localhost:{GHIDRA_PORT}/data" + update_payload = { + "address": test_address, + "newName": "TEST_COMBINED_NEW", + "type": "uint" + } + + update_response = requests.post(update_url, json=update_payload) + logger.info(f"Combined update response: {update_response.status_code}") + logger.info(f"Combined update response: {update_response.text}") + + return update_response.status_code == 200 and json.loads(update_response.text).get("success", False) + except Exception as e: + logger.error(f"Error in combined update test: {e}") + return False + +async def test_mcp_data_operations(): + """Test data operations using the MCP bridge.""" server_parameters = StdioServerParameters( command=sys.executable, args=["bridge_mcp_hydra.py"], ) - # Connect to the bridge - logger.info("Connecting to bridge...") + logger.info("Connecting to MCP bridge...") async with stdio_client(server_parameters) as (read_stream, write_stream): - # Create a session - logger.info("Creating session...") async with ClientSession(read_stream, write_stream) as session: - # Initialize the session logger.info("Initializing session...") - init_result = await session.initialize() - logger.info(f"Initialization result: {init_result}") + await session.initialize() - # List data to find a data item to test with - logger.info("Listing data...") - list_data_result = await session.call_tool( - "list_data_items", - arguments={"port": 8192, "limit": 5} + # First set the current instance + logger.info("Setting current Ghidra instance...") + await session.call_tool( + "instances_use", + arguments={"port": 8192} ) - list_data_data = json.loads(list_data_result.content[0].text) - logger.info(f"List data result: {list_data_data}") - if "result" not in list_data_data or not list_data_data.get("result"): - logger.error("No data items found - cannot proceed with test") - return + # Get a valid address to work with + addresses = find_valid_addresses() + test_address = addresses[4] if addresses and len(addresses) > 4 else DEFAULT_MEMORY_ADDRESS - # Get the first data item for testing - data_item = list_data_data["result"][0] - data_address = data_item.get("address") - original_name = data_item.get("label") + logger.info(f"Using address {test_address} for MCP data operations test") - if not data_address: - logger.error("No address found in data item - cannot proceed with test") - return + # Test data_create + try: + logger.info("Testing data_create...") + create_result = await session.call_tool( + "data_create", + arguments={"address": test_address, "data_type": "uint"} + ) + create_data = json.loads(create_result.content[0].text) + assert create_data.get("success", False), "data_create failed" + logger.info("data_create passed") - logger.info(f"Testing with data at address {data_address}, original name: {original_name}") - - # Test renaming the data - test_name = f"TEST_DATA_{int(time.time())}" - logger.info(f"Renaming data to {test_name}") - - rename_result = await session.call_tool( - "update_data", - arguments={"port": 8192, "address": data_address, "name": test_name} - ) - - rename_data = json.loads(rename_result.content[0].text) - logger.info(f"Rename result: {rename_data}") - - if not rename_data.get("success", False): - logger.error(f"Failed to rename data: {rename_data.get('error', {}).get('message', 'Unknown error')}") - else: - logger.info("Data renamed successfully") - - # Test changing the data type - test_type = "uint32_t *" # Pointer to uint32_t - adjust as needed for your test data - logger.info(f"Changing data type to {test_type}") - - type_result = await session.call_tool( - "update_data", - arguments={"port": 8192, "address": data_address, "data_type": test_type} - ) - - type_data = json.loads(type_result.content[0].text) - logger.info(f"Change type result: {type_data}") - - if not type_data.get("success", False): - logger.error(f"Failed to change data type: {type_data.get('error', {}).get('message', 'Unknown error')}") - else: - logger.info("Data type changed successfully") - - # Test both operations together - logger.info(f"Restoring original name and trying different type") - - combined_result = await session.call_tool( - "update_data", - arguments={ - "port": 8192, - "address": data_address, - "name": original_name, - "data_type": "uint32_t" - } - ) - - combined_data = json.loads(combined_result.content[0].text) - logger.info(f"Combined update result: {combined_data}") - - if not combined_data.get("success", False): - logger.error(f"Failed to perform combined update: {combined_data.get('error', {}).get('message', 'Unknown error')}") - else: - logger.info("Combined update successful") + # Test data_rename + logger.info("Testing data_rename...") + test_name = f"MCP_TEST_{int(time.time())}" + rename_result = await session.call_tool( + "data_rename", + arguments={"address": test_address, "name": test_name} + ) + rename_data = json.loads(rename_result.content[0].text) + assert rename_data.get("success", False), "data_rename failed" + logger.info("data_rename passed") + + # Test data_set_type + logger.info("Testing data_set_type...") + set_type_result = await session.call_tool( + "data_set_type", + arguments={"address": test_address, "data_type": "byte"} + ) + set_type_data = json.loads(set_type_result.content[0].text) + assert set_type_data.get("success", False), "data_set_type failed" + logger.info("data_set_type passed") + + # Test memory_read on the data + logger.info("Testing memory_read...") + read_result = await session.call_tool( + "memory_read", + arguments={"address": test_address, "length": 4} + ) + read_data = json.loads(read_result.content[0].text) + assert read_data.get("success", False), "memory_read failed" + assert "hexBytes" in read_data, "memory_read response missing hexBytes" + logger.info("memory_read passed") + + # Test data_delete + logger.info("Testing data_delete...") + delete_result = await session.call_tool( + "data_delete", + arguments={"address": test_address} + ) + delete_data = json.loads(delete_result.content[0].text) + assert delete_data.get("success", False), "data_delete failed" + logger.info("data_delete passed") + + logger.info("All MCP data operations passed") + return True + + except Exception as e: + logger.error(f"Error in MCP data operations test: {e}") + # Try to clean up + try: + await session.call_tool("data_delete", arguments={"address": test_address}) + except: + pass + return False def main(): - """Main entry point""" + """Main entry point for data operations tests.""" + all_passed = True + try: - anyio.run(test_data_operations) + # Run HTTP API tests + logger.info("===== Testing HTTP API Data Operations =====") + + logger.info("----- Testing data creation -----") + create_result = test_http_data_create() + logger.info(f"Data creation test: {'PASSED' if create_result else 'FAILED'}") + all_passed = all_passed and create_result + + logger.info("----- Testing data rename -----") + rename_result = test_http_data_rename() + logger.info(f"Data rename test: {'PASSED' if rename_result else 'FAILED'}") + all_passed = all_passed and rename_result + + logger.info("----- Testing data type change -----") + type_result = test_http_data_type_change() + logger.info(f"Data type change test: {'PASSED' if type_result else 'FAILED'}") + all_passed = all_passed and type_result + + logger.info("----- Testing data delete -----") + delete_result = test_http_data_delete() + logger.info(f"Data delete test: {'PASSED' if delete_result else 'FAILED'}") + all_passed = all_passed and delete_result + + logger.info("----- Testing combined operations -----") + combined_result = test_http_combined_operations() + logger.info(f"Combined operations test: {'PASSED' if combined_result else 'FAILED'}") + all_passed = all_passed and combined_result + + # Run MCP bridge tests + logger.info("===== Testing MCP Bridge Data Operations =====") + mcp_result = anyio.run(test_mcp_data_operations) + logger.info(f"MCP data operations test: {'PASSED' if mcp_result else 'FAILED'}") + all_passed = all_passed and mcp_result + + logger.info(f"Overall data operations test: {'PASSED' if all_passed else 'FAILED'}") + if not all_passed: + sys.exit(1) + except Exception as e: - logger.error(f"Error: {e}") + logger.error(f"Unexpected error in data tests: {e}") sys.exit(1) if __name__ == "__main__": diff --git a/test_data_simple.py b/test_data_simple.py deleted file mode 100755 index 0b6ad08..0000000 --- a/test_data_simple.py +++ /dev/null @@ -1,54 +0,0 @@ -#!/usr/bin/env python3 -""" -Direct test for data operations. -""" -import json -import logging -import sys -import requests - -# Setup logging -logging.basicConfig(level=logging.INFO) -logger = logging.getLogger("simple_test") - -def test_create_data(): - address = "08000000" - - # Try data types - types_to_try = ["uint32_t", "int", "dword", "byte", "pointer"] - - for data_type in types_to_try: - logger.info(f"Testing data type: {data_type}") - - url = f"http://localhost:8192/data" - payload = { - "address": address, - "type": data_type, - "newName": f"TEST_{data_type.upper()}" # Include a name for the data - } - - try: - response = requests.post(url, json=payload) - logger.info(f"Status: {response.status_code}") - logger.info(f"Response: {response.text}") - if response.status_code == 200: - logger.info(f"Success with data type {data_type}") - return True - except Exception as e: - logger.error(f"Error: {e}") - - return False - -def main(): - try: - result = test_create_data() - if result: - logger.info("Test successful!") - else: - logger.error("All test data types failed") - except Exception as e: - logger.error(f"Unexpected error: {e}") - sys.exit(1) - -if __name__ == "__main__": - main() \ No newline at end of file diff --git a/test_data_type.py b/test_data_type.py deleted file mode 100755 index 1184971..0000000 --- a/test_data_type.py +++ /dev/null @@ -1,105 +0,0 @@ -#!/usr/bin/env python3 -""" -Test script for setting data types in GhydraMCP bridge. -""" -import json -import logging -import sys -import time -from urllib.parse import quote - -import anyio -from mcp.client.session import ClientSession -from mcp.client.stdio import StdioServerParameters, stdio_client - -# Setup logging -logging.basicConfig(level=logging.INFO) -logger = logging.getLogger("data_type_test") - -async def test_set_data_type(): - """Test the set_data_type tool""" - # Configure the server parameters - server_parameters = StdioServerParameters( - command=sys.executable, - args=["bridge_mcp_hydra.py"], - ) - - # Connect to the bridge - logger.info("Connecting to bridge...") - async with stdio_client(server_parameters) as (read_stream, write_stream): - # Create a session - logger.info("Creating session...") - async with ClientSession(read_stream, write_stream) as session: - # Initialize the session - logger.info("Initializing session...") - init_result = await session.initialize() - logger.info(f"Initialization result: {init_result}") - - # List tools to make sure our new tool is available - logger.info("Listing tools...") - tools_result = await session.list_tools() - tool_data = json.loads(tools_result.content[0].text) if tools_result.content else None - - tools = tool_data.get("tools", []) if tool_data else [] - tool_names = [t.get("name") for t in tools] - logger.info(f"Available tools: {tool_names}") - - if "set_data_type" not in tool_names: - logger.error("set_data_type tool not found!") - return - - # List data to find a data item to test with - logger.info("Listing data...") - list_data_result = await session.call_tool( - "list_data_items", - arguments={"port": 8192, "limit": 5} - ) - list_data_data = json.loads(list_data_result.content[0].text) - - if "result" not in list_data_data or not list_data_data.get("result"): - logger.error("No data items found - cannot proceed with test") - return - - # Get the first data item for testing - data_item = list_data_data["result"][0] - data_address = data_item.get("address") - original_type = data_item.get("dataType") - - if not data_address: - logger.error("No address found in data item - cannot proceed with test") - return - - logger.info(f"Testing with data at address {data_address}, original type: {original_type}") - - # Test with simple types first - simple_tests = ["uint32_t", "int", "byte", "word", "dword"] - - for test_type in simple_tests: - logger.info(f"Testing type: {test_type}") - set_type_result = await session.call_tool( - "set_data_type", - arguments={"port": 8192, "address": data_address, "data_type": test_type} - ) - - try: - set_type_data = json.loads(set_type_result.content[0].text) - logger.info(f"Result: {set_type_data}") - - if set_type_data.get("success", False): - logger.info(f"Successfully set type to {test_type}") - break - else: - logger.warning(f"Failed to set type to {test_type}: {set_type_data.get('error', {}).get('message', 'Unknown error')}") - except Exception as e: - logger.error(f"Error processing result: {e}") - -def main(): - """Main entry point""" - try: - anyio.run(test_set_data_type) - except Exception as e: - logger.error(f"Error: {e}") - sys.exit(1) - -if __name__ == "__main__": - main() \ No newline at end of file diff --git a/test_data_update.py b/test_data_update.py deleted file mode 100755 index 6f46e95..0000000 --- a/test_data_update.py +++ /dev/null @@ -1,181 +0,0 @@ -#!/usr/bin/env python3 -""" -Dedicated test script for the GhydraMCP data handling API. - -This script has standalone tests to validate the three key data manipulation operations: -1. Rename only - Change the name without changing the data type -2. Type change only - Change the data type while preserving the name -3. Update both - Change both name and type simultaneously - -These tests operate on a low level and can be run independently of the main test suite -to diagnose issues with the API's data handling capabilities. - -Usage: - python test_data_update.py -""" -import json -import requests -import sys -import argparse - -BASE_URL = "http://localhost:8192" - -def test_data_update(verbose=True, base_url=None): - """Test data update operations - - Args: - verbose: Whether to print detailed output - base_url: Base URL for the Ghidra HTTP API (default: http://localhost:8192) - - Returns: - bool: True if all tests pass, False otherwise - """ - if base_url: - global BASE_URL - BASE_URL = base_url - - # Track test results - all_tests_passed = True - - # First find a suitable data item to test with - if verbose: - print("Fetching data items...") - response = requests.get(f"{BASE_URL}/data?limit=1") - - if response.status_code != 200: - print(f"Error: Failed to fetch data items, status {response.status_code}") - print(response.text) - return False - - data = response.json() - if not data.get("success"): - print(f"Error: API call failed: {data.get('error', 'Unknown error')}") - return False - - # Extract address from first data item - result = data.get("result", []) - if not result or not isinstance(result, list) or not result[0].get("address"): - print("Error: No data items found or invalid response format") - if result and verbose: - print(f"Result: {json.dumps(result, indent=2)}") - return False - - address = result[0]["address"] - if verbose: - print(f"Using data item at address: {address}") - - # Test 1: Renaming only - if verbose: - print("\n--- Test 1: Rename Only ---") - test_name = "TEST_DATA_RENAME" - payload = { - "address": address, - "newName": test_name - } - - if verbose: - print(f"Request: POST {BASE_URL}/data") - print(f"Payload: {json.dumps(payload, indent=2)}") - - response = requests.post(f"{BASE_URL}/data", json=payload) - if verbose: - print(f"Status: {response.status_code}") - print(f"Response: {json.dumps(response.json(), indent=2)}") - - # Check Test 1 results - test1_passed = response.status_code == 200 and response.json().get("success") - if not test1_passed: - print(f"ERROR: Test 1 (Rename Only) failed: {response.status_code}") - all_tests_passed = False - - # Test 2: Type change only - if verbose: - print("\n--- Test 2: Type Change Only ---") - payload = { - "address": address, - "type": "int" # Using 'type' as parameter name - } - - if verbose: - print(f"Request: POST {BASE_URL}/data/type") - print(f"Payload: {json.dumps(payload, indent=2)}") - - response = requests.post(f"{BASE_URL}/data/type", json=payload) - if verbose: - print(f"Status: {response.status_code}") - print(f"Response: {json.dumps(response.json(), indent=2)}") - - # Check Test 2 results - test2_passed = response.status_code == 200 and response.json().get("success") - if not test2_passed: - print(f"ERROR: Test 2 (Type Change Only) failed: {response.status_code}") - all_tests_passed = False - - # Test 3: Both name and type change - if verbose: - print("\n--- Test 3: Both Name and Type Change ---") - payload = { - "address": address, - "newName": "TEST_DATA_BOTH", - "type": "byte" # Using 'type' as parameter name - } - - if verbose: - print(f"Request: POST {BASE_URL}/data/update") - print(f"Payload: {json.dumps(payload, indent=2)}") - - response = requests.post(f"{BASE_URL}/data/update", json=payload) - if verbose: - print(f"Status: {response.status_code}") - print(f"Response: {json.dumps(response.json(), indent=2)}") - - # Check Test 3 results - test3_passed = response.status_code == 200 and response.json().get("success") - if not test3_passed: - print(f"ERROR: Test 3 (Both Name and Type Change via /data/update) failed: {response.status_code}") - all_tests_passed = False - - # Test 4: Direct raw request using the /data endpoint - if verbose: - print("\n--- Test 4: Direct Request to /data endpoint ---") - payload = { - "address": address, - "newName": "TEST_DIRECT_UPDATE", - "type": "int" # Using 'type' parameter name - } - - if verbose: - print(f"Request: POST {BASE_URL}/data") - print(f"Payload: {json.dumps(payload, indent=2)}") - - response = requests.post(f"{BASE_URL}/data", json=payload) - if verbose: - print(f"Status: {response.status_code}") - print(f"Response: {json.dumps(response.json(), indent=2)}") - - # Check Test 4 results - test4_passed = response.status_code == 200 and response.json().get("success") - if not test4_passed: - print(f"ERROR: Test 4 (Both Name and Type Change via /data) failed: {response.status_code}") - all_tests_passed = False - - # Print summary - if verbose: - print("\n--- Test Summary ---") - print(f"Test 1 (Rename Only): {'PASSED' if test1_passed else 'FAILED'}") - print(f"Test 2 (Type Change Only): {'PASSED' if test2_passed else 'FAILED'}") - print(f"Test 3 (Both Name and Type Change via /data/update): {'PASSED' if test3_passed else 'FAILED'}") - print(f"Test 4 (Both Name and Type Change via /data): {'PASSED' if test4_passed else 'FAILED'}") - print(f"Overall: {'ALL TESTS PASSED' if all_tests_passed else 'SOME TESTS FAILED'}") - - return all_tests_passed - -if __name__ == "__main__": - parser = argparse.ArgumentParser(description="Test data operations in the GhydraMCP HTTP API") - parser.add_argument("--quiet", "-q", action="store_true", help="Suppress detailed output") - parser.add_argument("--url", "-u", help="Base URL for the Ghidra HTTP API") - args = parser.parse_args() - - success = test_data_update(not args.quiet, args.url) - if not success: - sys.exit(1) \ No newline at end of file