refactor: clean up python comments and consolidate data test files

This commit is contained in:
Teal Bauer 2025-04-15 12:16:42 +02:00
parent 89fa811284
commit f04223d23a
14 changed files with 498 additions and 4154 deletions

View File

@ -1,150 +0,0 @@
# GhydraMCP Bridge API Documentation
## Overview
This document describes the MCP tools and resources exposed by the GhydraMCP bridge that connects to Ghidra's HTTP API. The bridge provides a higher-level interface optimized for AI agent usage.
## Core Concepts
- Each Ghidra instance runs its own HTTP server (default port 8192)
- The bridge discovers and manages multiple Ghidra instances
- Programs are addressed by their unique identifier within Ghidra (`project:/path/to/file`).
- The primary identifier for a program is its Ghidra path, e.g., `myproject:/path/to/mybinary.exe`.
- The bridge must keep track of which plugin host and port has which project & file and route accordingly
- Tools are organized by resource type (programs, functions, data, etc.)
- Consistent response format with success/error indicators
## Instance Management Tools
### `list_instances`
List all active Ghidra instances with their ports and project info.
### `discover_instances`
Scan for available Ghidra instances by port range.
### `register_instance`
Manually register a Ghidra instance by port/URL.
## Program Analysis Tools
### `list_functions`
List functions in current program with pagination.
### `get_function`
Get details and decompilation for a function by name.
### `get_function_by_address`
Get function details by memory address.
### `decompile_function_by_address`
Decompile function at specific address.
### `list_segments`
List memory segments/sections in program.
### `list_data_items`
List defined data items in program.
### `read_memory`
Read bytes from memory at address. Parameters:
- `address`: Hex address
- `length`: Bytes to read
- `format`: "hex", "base64" or "string" output format
### `write_memory`
Write bytes to memory at address (use with caution). Parameters:
- `address`: Hex address
- `bytes`: Data to write
- `format`: "hex", "base64" or "string" input format
### `list_variables`
List global variables with search/filter.
## Modification Tools
### `update_function`
Rename a function.
### `update_data`
Rename data at memory address.
### `set_function_prototype`
Change a function's signature.
### `rename_local_variable`
Rename variable within function.
### `set_local_variable_type`
Change variable's data type.
## Response Format
All tools return responses in this format:
```json
{
"id": "request-id",
"instance": "http://host:port",
"success": true/false,
"result": {...}, // Tool-specific data
"error": { // Only on failure
"code": "...",
"message": "..."
},
"_links": { // HATEOAS links
"self": {"href": "/path"},
"related": {"href": "/other"}
}
}
```
## Example Usage
1. Discover available instances:
```python
discover_instances()
```
2. List functions in first instance:
```python
list_functions(port=8192, limit=10)
```
3. Decompile main function:
```python
get_function(port=8192, name="main")
```
4. Rename a function:
```python
update_function(port=8192, name="FUN_1234", new_name="parse_data")
```
## Error Handling
- Check `success` field first
- On failure, `error` contains details
- Common error codes:
- `INSTANCE_NOT_FOUND`
- `RESOURCE_NOT_FOUND`
- `INVALID_PARAMETER`
- `TRANSACTION_FAILED`
## Advanced Analysis Tools
### `list_xrefs`
List cross-references between code/data. Parameters:
- `to_addr`: Filter refs to this address
- `from_addr`: Filter refs from this address
- `type`: Filter by ref type ("CALL", "READ", etc)
- Basic pagination via `offset`/`limit`
### `analyze_program`
Run Ghidra analysis with optional settings:
- `analysis_options`: Dict of analysis passes to enable
### `get_callgraph`
Get function call graph visualization data:
- `function`: Starting function (defaults to entry point)
- `max_depth`: Maximum call depth (default: 3)
### `get_dataflow`
Perform data flow analysis from address:
- `address`: Starting point in hex
- `direction`: "forward" or "backward"
- `max_steps`: Max analysis steps

View File

@ -5,8 +5,8 @@
# "requests==2.32.3", # "requests==2.32.3",
# ] # ]
# /// # ///
# GhydraMCP Bridge for Ghidra HATEOAS API - Refactored for MCP optimization # GhydraMCP Bridge for Ghidra HATEOAS API - Optimized for MCP integration
# This provides a revised implementation with namespaced tools # Provides namespaced tools for interacting with Ghidra's reverse engineering capabilities
import os import os
import signal import signal
import sys import sys
@ -21,24 +21,19 @@ from mcp.server.fastmcp import FastMCP
# ================= Core Infrastructure ================= # ================= Core Infrastructure =================
# Allowed origins for CORS/CSRF protection
ALLOWED_ORIGINS = os.environ.get( ALLOWED_ORIGINS = os.environ.get(
"GHIDRA_ALLOWED_ORIGINS", "http://localhost").split(",") "GHIDRA_ALLOWED_ORIGINS", "http://localhost").split(",")
# Track active Ghidra instances (port -> info dict)
active_instances: Dict[int, dict] = {} active_instances: Dict[int, dict] = {}
instances_lock = Lock() instances_lock = Lock()
DEFAULT_GHIDRA_PORT = 8192 DEFAULT_GHIDRA_PORT = 8192
DEFAULT_GHIDRA_HOST = "localhost" DEFAULT_GHIDRA_HOST = "localhost"
# Port ranges for scanning
QUICK_DISCOVERY_RANGE = range(DEFAULT_GHIDRA_PORT, DEFAULT_GHIDRA_PORT+10) QUICK_DISCOVERY_RANGE = range(DEFAULT_GHIDRA_PORT, DEFAULT_GHIDRA_PORT+10)
FULL_DISCOVERY_RANGE = range(DEFAULT_GHIDRA_PORT, DEFAULT_GHIDRA_PORT+20) FULL_DISCOVERY_RANGE = range(DEFAULT_GHIDRA_PORT, DEFAULT_GHIDRA_PORT+20)
# Version information
BRIDGE_VERSION = "v2.0.0-beta.1" BRIDGE_VERSION = "v2.0.0-beta.1"
REQUIRED_API_VERSION = 2 REQUIRED_API_VERSION = 2
# Global state for the current instance
current_instance_port = DEFAULT_GHIDRA_PORT current_instance_port = DEFAULT_GHIDRA_PORT
instructions = """ instructions = """

125
error.tmp
View File

@ -1,125 +0,0 @@
╭────────────────────────────────────────────────────────────────────────────────────── Traceback (most recent call last) ───────────────────────────────────────────────────────────────────────────────────────╮
│ /Users/teal/.asdf/installs/python/3.11.1/lib/python3.11/site-packages/mcp/cli/cli.py:236 in dev │
│ │
│ 233 │ ╭────────────────────────────────── locals ──────────────────────────────────╮ │
│ 234 │ try: │ file = PosixPath('/Users/teal/src/GhydraMCP/bridge_mcp_hydra.py') │ │
│ 235 │ │ # Import server to get dependencies │ file_spec = 'bridge_mcp_hydra.py' │ │
│ ❱ 236 │ │ server = _import_server(file, server_object) │ server_object = None │ │
│ 237 │ │ if hasattr(server, "dependencies"): │ with_editable = None │ │
│ 238 │ │ │ with_packages = list(set(with_packages + server.dependencies)) │ with_packages = [] │ │
│ 239 ╰────────────────────────────────────────────────────────────────────────────╯ │
│ │
│ /Users/teal/.asdf/installs/python/3.11.1/lib/python3.11/site-packages/mcp/cli/cli.py:142 in _import_server │
│ │
│ 139 │ │ sys.exit(1) │
│ 140 │ │
│ 141 │ module = importlib.util.module_from_spec(spec) │
│ ❱ 142 │ spec.loader.exec_module(module) │
│ 143 │ │
│ 144 │ # If no object specified, try common server names │
│ 145 │ if not server_object: │
│ │
│ ╭─────────────────────────────────────────────────────────────────────────────────────── locals ───────────────────────────────────────────────────────────────────────────────────────╮ │
│ │ file = PosixPath('/Users/teal/src/GhydraMCP/bridge_mcp_hydra.py') │ │
│ │ file_dir = '/Users/teal/src/GhydraMCP' │ │
│ │ module = <module 'server_module' from '/Users/teal/src/GhydraMCP/bridge_mcp_hydra.py'> │ │
│ │ server_object = None │ │
│ │ spec = ModuleSpec(name='server_module', loader=<_frozen_importlib_external.SourceFileLoader object at 0x102ed0750>, origin='/Users/teal/src/GhydraMCP/bridge_mcp_hydra.py') │ │
│ ╰──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╯ │
│ in exec_module:940 │
│ ╭─────────────────────────────────────────────────── locals ───────────────────────────────────────────────────╮ │
│ │ code = <code object <module> at 0x159961400, file "/Users/teal/src/GhydraMCP/bridge_mcp_hydra.py", line 1> │ │
│ │ module = <module 'server_module' from '/Users/teal/src/GhydraMCP/bridge_mcp_hydra.py'> │ │
│ │ self = <_frozen_importlib_external.SourceFileLoader object at 0x102ed0750> │ │
│ ╰──────────────────────────────────────────────────────────────────────────────────────────────────────────────╯ │
│ in _call_with_frames_removed:241 │
│ ╭───────────────────────────────────────────────────────────────────────────────────────────── locals ─────────────────────────────────────────────────────────────────────────────────────────────╮ │
│ │ args = ( │ │
│ │ │ <code object <module> at 0x159961400, file "/Users/teal/src/GhydraMCP/bridge_mcp_hydra.py", line 1>, │ │
│ │ │ { │ │
│ │ │ │ '__name__': 'server_module', │ │
│ │ │ │ '__doc__': None, │ │
│ │ │ │ '__package__': '', │ │
│ │ │ │ '__loader__': <_frozen_importlib_external.SourceFileLoader object at 0x102ed0750>, │ │
│ │ │ │ '__spec__': ModuleSpec(name='server_module', loader=<_frozen_importlib_external.SourceFileLoader object at 0x102ed0750>, origin='/Users/teal/src/GhydraMCP/bridge_mcp_hydra.py'), │ │
│ │ │ │ '__file__': '/Users/teal/src/GhydraMCP/bridge_mcp_hydra.py', │ │
│ │ │ │ '__cached__': '/Users/teal/src/GhydraMCP/__pycache__/bridge_mcp_hydra.cpython-311.pyc', │ │
│ │ │ │ '__builtins__': { │ │
│ │ │ │ │ '__name__': 'builtins', │ │
│ │ │ │ │ '__doc__': 'Built-in functions, exceptions, and other objects.\n\nNoteworthy: None is the `nil'+46, │ │
│ │ │ │ │ '__package__': '', │ │
│ │ │ │ │ '__loader__': <class '_frozen_importlib.BuiltinImporter'>, │ │
│ │ │ │ │ '__spec__': ModuleSpec(name='builtins', loader=<class '_frozen_importlib.BuiltinImporter'>, origin='built-in'), │ │
│ │ │ │ │ '__build_class__': <built-in function __build_class__>, │ │
│ │ │ │ │ '__import__': <built-in function __import__>, │ │
│ │ │ │ │ 'abs': <built-in function abs>, │ │
│ │ │ │ │ 'all': <built-in function all>, │ │
│ │ │ │ │ 'any': <built-in function any>, │ │
│ │ │ │ │ ... +147 │ │
│ │ │ │ }, │ │
│ │ │ │ '__annotations__': {'active_instances': typing.Dict[int, dict]}, │ │
│ │ │ │ 'os': <module 'os' (frozen)>, │ │
│ │ │ │ ... +42 │ │
│ │ │ } │ │
│ │ ) │ │
│ │ f = <built-in function exec> │ │
│ │ kwds = {} │ │
│ ╰──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╯ │
│ │
│ /Users/teal/src/GhydraMCP/bridge_mcp_hydra.py:583 in <module> │
│ │
│ 580 # Resources provide information that can be loaded directly into context │
│ 581 # They focus on data and minimize metadata │
│ 582 │
│ ❱ 583 @mcp.resource() │
│ 584 def ghidra_instance(port: int = None) -> dict: │
│ 585 │ """Get detailed information about a Ghidra instance and the loaded program │
│ 586 │
│ │
│ ╭─────────────────────────────────────────────────────────────────── locals ────────────────────────────────────────────────────────────────────╮ │
│ │ _discover_instances = <function _discover_instances at 0x1030600e0> │ │
│ │ _get_instance_port = <function _get_instance_port at 0x102eb7100> │ │
│ │ _make_request = <function _make_request at 0x103047880> │ │
│ │ active_instances = {} │ │
│ │ ALLOWED_ORIGINS = ['http://localhost'] │ │
│ │ Any = typing.Any │ │
│ │ BRIDGE_VERSION = 'v2.0.0-beta.1' │ │
│ │ current_instance_port = 8192 │ │
│ │ DEFAULT_GHIDRA_HOST = 'localhost' │ │
│ │ DEFAULT_GHIDRA_PORT = 8192 │ │
│ │ Dict = typing.Dict │ │
│ │ FastMCP = <class 'mcp.server.fastmcp.server.FastMCP'> │ │
│ │ FULL_DISCOVERY_RANGE = range(8192, 8212) │ │
│ │ get_instance_url = <function get_instance_url at 0x102d8e160> │ │
│ │ ghidra_host = 'localhost' │ │
│ │ handle_sigint = <function handle_sigint at 0x103060220> │ │
│ │ instances_lock = <unlocked _thread.lock object at 0x102ed2e40> │ │
│ │ instructions = '\nGhydraMCP allows interacting with multiple Ghidra SRE instances. Ghidra SRE is '+497 │ │
│ │ List = typing.List │ │
│ │ Lock = <built-in function allocate_lock> │ │
│ │ mcp = <mcp.server.fastmcp.server.FastMCP object at 0x102ed3e10> │ │
│ │ Optional = typing.Optional │ │
│ │ os = <module 'os' (frozen)> │ │
│ │ periodic_discovery = <function periodic_discovery at 0x103060180> │ │
│ │ QUICK_DISCOVERY_RANGE = range(8192, 8202) │ │
│ │ quote = <function quote at 0x1001dc4a0> │ │
│ │ register_instance = <function register_instance at 0x103060040> │ │
│ │ requests = <module 'requests' from '/Users/teal/.asdf/installs/python/3.11.1/lib/python3.11/site-packages/requests/__init__.py'> │ │
│ │ REQUIRED_API_VERSION = 2 │ │
│ │ safe_delete = <function safe_delete at 0x103047ec0> │ │
│ │ safe_get = <function safe_get at 0x103047c40> │ │
│ │ safe_patch = <function safe_patch at 0x103047e20> │ │
│ │ safe_post = <function safe_post at 0x103047d80> │ │
│ │ safe_put = <function safe_put at 0x103047ce0> │ │
│ │ signal = <module 'signal' from '/Users/teal/.asdf/installs/python/3.11.1/lib/python3.11/signal.py'> │ │
│ │ simplify_response = <function simplify_response at 0x103047f60> │ │
│ │ sys = <module 'sys' (built-in)> │ │
│ │ threading = <module 'threading' from '/Users/teal/.asdf/installs/python/3.11.1/lib/python3.11/threading.py'> │ │
│ │ time = <module 'time' (built-in)> │ │
│ │ Union = typing.Union │ │
│ │ urlencode = <function urlencode at 0x1005cac00> │ │
│ │ urlparse = <function urlparse at 0x1005c9760> │ │
│ │ validate_origin = <function validate_origin at 0x1030477e0> │ │
│ ╰───────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╯ │
╰────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╯
TypeError: FastMCP.resource() missing 1 required positional argument: 'uri'

File diff suppressed because it is too large Load Diff

View File

@ -1,261 +0,0 @@
# GhydraMCP Bridge Refactoring Proposal
## Current Issues
The current bridge implementation exposes all functionality as MCP tools, which creates several problems:
1. **Discoverability**: With dozens of tool functions, it's difficult for AI agents to identify the correct tool to use for a specific task.
2. **Consistency**: The API surface is large and not organized by conceptual resources, making it harder to understand what's related.
3. **Context Loading**: Many operations require repeated loading of program information that could be provided more efficiently as resources.
4. **Default Selection**: The current approach requires explicit port selection for each operation, instead of following a "current working instance" pattern.
## Proposed MCP-Oriented Refactoring
Restructure the bridge to follow MCP patterns more closely:
### 1. Resources (for Context Loading)
Resources provide information that can be loaded directly into the LLM's context.
```python
@mcp.resource()
def ghidra_instance(port: int = None) -> dict:
"""Get information about a Ghidra instance or the current working instance
Args:
port: Specific Ghidra instance port (optional, uses current if omitted)
Returns:
dict: Detailed information about the Ghidra instance and loaded program
"""
# Implementation that gets instance info and the current program details
# from the currently selected "working" instance or a specific port
```
```python
@mcp.resource()
def decompiled_function(name: str = None, address: str = None) -> str:
"""Get decompiled C code for a function
Args:
name: Function name (mutually exclusive with address)
address: Function address in hex format (mutually exclusive with name)
Returns:
str: The decompiled C code as a string
"""
# Implementation that only returns the decompiled text directly
```
```python
@mcp.resource()
def function_info(name: str = None, address: str = None) -> dict:
"""Get detailed information about a function
Args:
name: Function name (mutually exclusive with address)
address: Function address in hex format (mutually exclusive with name)
Returns:
dict: Complete function information including signature, parameters, etc.
"""
# Implementation that returns detailed function information
```
```python
@mcp.resource()
def disassembly(name: str = None, address: str = None) -> str:
"""Get disassembled instructions for a function
Args:
name: Function name (mutually exclusive with address)
address: Function address in hex format (mutually exclusive with name)
Returns:
str: Formatted disassembly listing as a string
"""
# Implementation that returns formatted text disassembly
```
### 2. Prompts (for Interaction Patterns)
Prompts define reusable templates for LLM interactions, making common workflows easier.
```python
@mcp.prompt("analyze_function")
def analyze_function_prompt(name: str = None, address: str = None):
"""A prompt that guides the LLM through analyzing a function's purpose
Args:
name: Function name (mutually exclusive with address)
address: Function address in hex format (mutually exclusive with name)
"""
# Implementation returns a prompt template with decompiled code and disassembly
# that helps the LLM systematically analyze a function
return {
"prompt": f"""
Analyze the following function: {name or address}
Decompiled code:
```c
{decompiled_function(name=name, address=address)}
```
Disassembly:
```
{disassembly(name=name, address=address)}
```
1. What is the purpose of this function?
2. What are the key parameters and their uses?
3. What are the return values and their meanings?
4. Are there any security concerns in this implementation?
5. Describe the algorithm or process being implemented.
""",
"context": {
"function_info": function_info(name=name, address=address)
}
}
```
```python
@mcp.prompt("identify_vulnerabilities")
def identify_vulnerabilities_prompt(name: str = None, address: str = None):
"""A prompt that helps the LLM identify potential vulnerabilities in a function
Args:
name: Function name (mutually exclusive with address)
address: Function address in hex format (mutually exclusive with name)
"""
# Implementation returns a prompt focused on finding security issues
```
### 3. Tools (for Function Selection)
Tools are organized by domain concepts rather than just mirroring the low-level API.
```python
@mcp.tool_group("instances")
class InstanceTools:
@mcp.tool()
def list() -> dict:
"""List all active Ghidra instances"""
return list_instances()
@mcp.tool()
def discover() -> dict:
"""Discover available Ghidra instances"""
return discover_instances()
@mcp.tool()
def register(port: int, url: str = None) -> str:
"""Register a new Ghidra instance"""
return register_instance(port, url)
@mcp.tool()
def use(port: int) -> str:
"""Set the current working Ghidra instance"""
# Implementation that sets the default instance
global current_instance_port
current_instance_port = port
return f"Now using Ghidra instance on port {port}"
```
```python
@mcp.tool_group("functions")
class FunctionTools:
@mcp.tool()
def list(offset: int = 0, limit: int = 100, **filters) -> dict:
"""List functions with filtering and pagination"""
# Implementation that uses the current instance
return list_functions(port=current_instance_port, offset=offset, limit=limit, **filters)
@mcp.tool()
def get(name: str = None, address: str = None) -> dict:
"""Get detailed information about a function"""
return get_function(port=current_instance_port, name=name, address=address)
@mcp.tool()
def create(address: str) -> dict:
"""Create a new function at the specified address"""
return create_function(port=current_instance_port, address=address)
@mcp.tool()
def rename(name: str = None, address: str = None, new_name: str = "") -> dict:
"""Rename a function"""
return rename_function(port=current_instance_port,
name=name, address=address, new_name=new_name)
@mcp.tool()
def set_signature(name: str = None, address: str = None, signature: str = "") -> dict:
"""Set a function's signature/prototype"""
return set_function_signature(port=current_instance_port,
name=name, address=address, signature=signature)
```
Similar tool groups would be created for:
- `data`: Data manipulation tools
- `memory`: Memory reading/writing tools
- `analysis`: Program analysis tools
- `xrefs`: Cross-reference navigation tools
- `symbols`: Symbol management tools
- `variables`: Variable manipulation tools
### 4. Simplified Instance Management
Add a "current working instance" pattern:
```python
# Global state for the current instance
current_instance_port = DEFAULT_GHIDRA_PORT
# Helper function to get the current instance or validate a specific port
def _get_instance_port(port=None):
port = port or current_instance_port
# Validate that the instance exists and is active
if port not in active_instances:
# Try to register it if not found
register_instance(port)
if port not in active_instances:
raise ValueError(f"No active Ghidra instance on port {port}")
return port
# All tools would use this helper, falling back to the current instance if no port is specified
def read_memory(address: str, length: int = 16, format: str = "hex", port: int = None) -> dict:
"""Read bytes from memory
Args:
address: Memory address in hex format
length: Number of bytes to read (default: 16)
format: Output format (default: "hex")
port: Specific Ghidra instance port (optional, uses current if omitted)
Returns:
dict: Memory content in the requested format
"""
port = _get_instance_port(port)
# Rest of implementation...
```
## Migration Strategy
1. Create a new MCP class structure in a separate file
2. Implement resource loaders for key items (functions, data, memory regions)
3. Implement prompt templates for common tasks
4. Organize tools into logical groups by domain concept
5. Add a current instance selection mechanism
6. Update documentation with clear examples of the new patterns
7. Create backward compatibility shims if needed
## Benefits of This Approach
1. **Better Discoverability**: Logical grouping helps agents find the right tool
2. **Context Efficiency**: Resources load just what's needed without extra metadata
3. **Streamlined Interaction**: Tools follow consistent patterns with sensible defaults
4. **Prompt Templates**: Common patterns are codified in reusable prompts
5. **More LLM-friendly**: Outputs optimized for consumption by language models
The refactored API would be easier to use, more efficient, and better aligned with MCP best practices, while maintaining all the current functionality.

File diff suppressed because it is too large Load Diff

View File

@ -68,24 +68,73 @@ def run_mcp_bridge_tests():
print(f"Error running MCP bridge tests: {str(e)}") print(f"Error running MCP bridge tests: {str(e)}")
return False return False
def run_data_tests():
"""Run the data operations tests."""
print_header("Running Data Operations Tests")
try:
result = subprocess.run(
[sys.executable, "test_data_operations.py"],
capture_output=True,
text=True
)
if result.stdout:
print("STDOUT:")
print(result.stdout)
if result.stderr:
print("STDERR:")
print(result.stderr)
return result.returncode == 0
except Exception as e:
print(f"Error running data operations tests: {str(e)}")
return False
def run_comment_tests():
"""Run the comment functionality tests."""
print_header("Running Comment Tests")
try:
result = subprocess.run(
[sys.executable, "test_comments.py"],
capture_output=True,
text=True
)
if result.stdout:
print("STDOUT:")
print(result.stdout)
if result.stderr:
print("STDERR:")
print(result.stderr)
return result.returncode == 0
except Exception as e:
print(f"Error running comment tests: {str(e)}")
return False
def run_all_tests(): def run_all_tests():
"""Run all tests""" """Run all tests"""
print_header("GhydraMCP Test Suite") print_header("GhydraMCP Test Suite")
# Run the HTTP API tests # Run test suites
http_api_success = run_http_api_tests() http_api_success = run_http_api_tests()
# Run the MCP bridge tests
mcp_bridge_success = run_mcp_bridge_tests() mcp_bridge_success = run_mcp_bridge_tests()
data_tests_success = run_data_tests()
comment_tests_success = run_comment_tests()
# Print a summary # Print a summary
print_header("Test Summary") print_header("Test Summary")
print(f"HTTP API Tests: {'PASSED' if http_api_success else 'FAILED'}") print(f"HTTP API Tests: {'PASSED' if http_api_success else 'FAILED'}")
print(f"MCP Bridge Tests: {'PASSED' if mcp_bridge_success else 'FAILED'}") print(f"MCP Bridge Tests: {'PASSED' if mcp_bridge_success else 'FAILED'}")
print(f"Overall: {'PASSED' if http_api_success and mcp_bridge_success else 'FAILED'}") print(f"Data Operations Tests: {'PASSED' if data_tests_success else 'FAILED'}")
print(f"Comment Tests: {'PASSED' if comment_tests_success else 'FAILED'}")
print(f"Overall: {'PASSED' if (http_api_success and mcp_bridge_success and data_tests_success and comment_tests_success) else 'FAILED'}")
# Return True if all tests passed, False otherwise return http_api_success and mcp_bridge_success and data_tests_success and comment_tests_success
return http_api_success and mcp_bridge_success
if __name__ == "__main__": if __name__ == "__main__":
# Check if we have the required dependencies # Check if we have the required dependencies
@ -104,9 +153,15 @@ if __name__ == "__main__":
elif sys.argv[1] == "--mcp": elif sys.argv[1] == "--mcp":
# Run only the MCP bridge tests # Run only the MCP bridge tests
success = run_mcp_bridge_tests() success = run_mcp_bridge_tests()
elif sys.argv[1] == "--data":
# Run only the data operations tests
success = run_data_tests()
elif sys.argv[1] == "--comments":
# Run only the comment tests
success = run_comment_tests()
else: else:
print(f"Unknown argument: {sys.argv[1]}") print(f"Unknown argument: {sys.argv[1]}")
print("Usage: python run_tests.py [--http|--mcp]") print("Usage: python run_tests.py [--http|--mcp|--data|--comments]")
sys.exit(1) sys.exit(1)
else: else:
# Run all tests # Run all tests

View File

@ -1,6 +1,10 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
""" """
Test script for the comment functionality in GhydraMCP. Test script for the comment functionality in GhydraMCP.
Tests both HTTP API and MCP bridge interfaces for setting and retrieving
different types of comments in Ghidra, including plate, pre, post, EOL,
repeatable, and decompiler comments.
""" """
import json import json
import logging import logging
@ -13,21 +17,23 @@ import requests
from mcp.client.session import ClientSession from mcp.client.session import ClientSession
from mcp.client.stdio import StdioServerParameters, stdio_client from mcp.client.stdio import StdioServerParameters, stdio_client
# Setup logging
logging.basicConfig(level=logging.INFO) logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("comment_test") logger = logging.getLogger("comment_test")
# Direct HTTP test functions
def test_http_api_comments(port=8192, address="08000200"): def test_http_api_comments(port=8192, address="08000200"):
"""Test setting comments directly with HTTP API""" """
Test setting and retrieving comments using direct HTTP API.
Args:
port: Ghidra HTTP API port
address: Memory address for comments
"""
logger.info("===== Testing HTTP API Comments =====") logger.info("===== Testing HTTP API Comments =====")
base_url = f"http://localhost:{port}" base_url = f"http://localhost:{port}"
# Test each comment type
comment_types = ["plate", "pre", "post", "eol", "repeatable"] comment_types = ["plate", "pre", "post", "eol", "repeatable"]
for i, comment_type in enumerate(comment_types): for comment_type in comment_types:
# Set comment
comment_text = f"TEST {comment_type.upper()} COMMENT {int(time.time())}" comment_text = f"TEST {comment_type.upper()} COMMENT {int(time.time())}"
logger.info(f"Setting {comment_type} comment: {comment_text}") logger.info(f"Setting {comment_type} comment: {comment_text}")
@ -40,37 +46,42 @@ def test_http_api_comments(port=8192, address="08000200"):
logger.info(f"Response: {r.text}") logger.info(f"Response: {r.text}")
if r.status_code == 200: if r.status_code == 200:
# Get the comment back to verify
r_get = requests.get(url, timeout=10) r_get = requests.get(url, timeout=10)
logger.info(f"GET Status code: {r_get.status_code}") logger.info(f"GET Status code: {r_get.status_code}")
logger.info(f"GET Response: {r_get.text}") logger.info(f"GET Response: {r_get.text}")
except Exception as e: except Exception as e:
logger.error(f"Error setting {comment_type} comment: {e}") logger.error(f"Error setting {comment_type} comment: {e}")
# MCP Bridge test functions
async def test_bridge_comments(): async def test_bridge_comments():
"""Test the bridge comment functionality""" """
Test MCP bridge comment functionality.
Sets and clears both plate comments and decompiler comments using the
MCP bridge interface.
"""
logger.info("===== Testing MCP Bridge Comments =====") logger.info("===== Testing MCP Bridge Comments =====")
# Configure the server parameters
server_parameters = StdioServerParameters( server_parameters = StdioServerParameters(
command=sys.executable, command=sys.executable,
args=["bridge_mcp_hydra.py"], args=["bridge_mcp_hydra.py"],
) )
# Connect to the bridge
logger.info("Connecting to bridge...") logger.info("Connecting to bridge...")
async with stdio_client(server_parameters) as (read_stream, write_stream): async with stdio_client(server_parameters) as (read_stream, write_stream):
# Create a session
logger.info("Creating session...") logger.info("Creating session...")
async with ClientSession(read_stream, write_stream) as session: async with ClientSession(read_stream, write_stream) as session:
# Initialize the session
logger.info("Initializing session...") logger.info("Initializing session...")
init_result = await session.initialize() await session.initialize()
# First set the current instance
logger.info("Setting current Ghidra instance...")
await session.call_tool(
"instances_use",
arguments={"port": 8192}
)
# Get a function to test with
logger.info("Getting current address...") logger.info("Getting current address...")
addr_result = await session.call_tool("get_current_address", arguments={"port": 8192}) addr_result = await session.call_tool("ui_get_current_address")
addr_data = json.loads(addr_result.content[0].text) addr_data = json.loads(addr_result.content[0].text)
if not addr_data.get("success", False): if not addr_data.get("success", False):
@ -80,51 +91,46 @@ async def test_bridge_comments():
address = addr_data.get("result", {}).get("address", "08000200") address = addr_data.get("result", {}).get("address", "08000200")
logger.info(f"Using address: {address}") logger.info(f"Using address: {address}")
# Test normal comment logger.info("Testing comments_set with plate type...")
logger.info("Testing set_comment with plate type...")
comment_text = f"MCP PLATE COMMENT {int(time.time())}" comment_text = f"MCP PLATE COMMENT {int(time.time())}"
result = await session.call_tool("set_comment", result = await session.call_tool("comments_set",
arguments={"port": 8192, arguments={"address": address,
"address": address,
"comment": comment_text, "comment": comment_text,
"comment_type": "plate"}) "comment_type": "plate"})
logger.info(f"set_comment result: {result}") logger.info(f"comments_set result: {result}")
# Test decompiler comment logger.info("Testing functions_set_comment...")
logger.info("Testing set_decompiler_comment...")
decompiler_comment = f"MCP DECOMPILER COMMENT {int(time.time())}" decompiler_comment = f"MCP DECOMPILER COMMENT {int(time.time())}"
decompile_result = await session.call_tool("set_decompiler_comment", decompile_result = await session.call_tool("functions_set_comment",
arguments={"port": 8192, arguments={"address": address,
"address": address,
"comment": decompiler_comment}) "comment": decompiler_comment})
logger.info(f"set_decompiler_comment result: {decompile_result}") logger.info(f"functions_set_comment result: {decompile_result}")
# Wait a bit and then clear comments
await anyio.sleep(5) await anyio.sleep(5)
# Clear the comments
logger.info("Clearing comments...") logger.info("Clearing comments...")
await session.call_tool("set_comment", await session.call_tool("comments_set",
arguments={"port": 8192, arguments={"address": address,
"address": address,
"comment": "", "comment": "",
"comment_type": "plate"}) "comment_type": "plate"})
await session.call_tool("set_decompiler_comment", await session.call_tool("functions_set_comment",
arguments={"port": 8192, arguments={"address": address,
"address": address,
"comment": ""}) "comment": ""})
def main(): def main():
"""Main entry point""" """
try: Main entry point for comment tests.
# First test HTTP API directly
test_http_api_comments()
# Then test through MCP bridge Runs both HTTP API and MCP bridge tests sequentially.
"""
try:
test_http_api_comments()
anyio.run(test_bridge_comments) anyio.run(test_bridge_comments)
logger.info("All comment tests completed successfully")
return True
except Exception as e: except Exception as e:
logger.error(f"Error: {e}") logger.error(f"Error in comment tests: {e}")
sys.exit(1) sys.exit(1)
if __name__ == "__main__": if __name__ == "__main__":

View File

@ -1,135 +0,0 @@
#!/usr/bin/env python3
"""
Test script to verify the create_data function works properly.
"""
import json
import logging
import sys
import requests
import time
# Setup logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("create_data_test")
def wait_for_program_loaded():
"""Wait for a Ghidra program to be loaded."""
for _ in range(10): # Try for ~20 seconds
try:
response = requests.get("http://localhost:8192/program")
if response.status_code == 200:
data = json.loads(response.text)
if data.get("success", False):
logger.info("Program loaded: " + data["result"]["name"])
return True
except Exception as e:
logger.warning(f"Error checking program status: {e}")
logger.info("Waiting for program to load...")
time.sleep(2)
logger.error("Timed out waiting for program to load")
return False
def test_create_data():
"""Test creating data at different addresses with different types."""
# First wait for a program to be loaded
if not wait_for_program_loaded():
logger.error("No program loaded, cannot test create_data")
return False
# First get the memory map to find addresses where we can create data
try:
response = requests.get("http://localhost:8192/memory")
memory_info = json.loads(response.text)
# Get valid addresses from an existing memory region
memory_blocks = memory_info.get("result", [])
# Find a valid memory block
valid_addresses = []
for block in memory_blocks:
if "start" in block and "name" in block:
# Get starting address of a RAM block
if "RAM" in block["name"].upper():
# Use the first 10 bytes of this RAM block
addr_base = int(block["start"], 16)
for i in range(10):
valid_addresses.append(f"{addr_base + i:08x}")
break
# If no RAM blocks, try any memory block
if not valid_addresses:
for block in memory_blocks:
if "start" in block:
# Use the first 10 bytes of this block
addr_base = int(block["start"], 16)
for i in range(10):
valid_addresses.append(f"{addr_base + i:08x}")
break
# Fallback to known addresses if still nothing
if not valid_addresses:
valid_addresses = ["08000100", "08000104", "08000108", "0800010c",
"08000110", "08000114", "08000118", "0800011c"]
logger.info(f"Will try using addresses: {valid_addresses[:3]}...")
addresses = valid_addresses
except Exception as e:
logger.error(f"Error getting memory map: {e}")
# Fallback to some addresses that might be valid
addresses = ["08000100", "08000104", "08000108", "0800010c",
"08000110", "08000114", "08000118", "0800011c"]
# Try data types
types_to_try = ["uint32_t", "int", "float", "byte", "char", "word", "dword", "string"]
success_count = 0
for i, data_type in enumerate(types_to_try):
address = addresses[i % len(addresses)]
logger.info(f"Testing data type: {data_type} at address {address}")
# First try direct HTTP API
url = f"http://localhost:8192/data"
payload = {
"address": address,
"type": data_type,
"newName": f"TEST_{data_type.upper()}"
}
# Add size for string types
if data_type.lower() == "string":
payload["size"] = 16
try:
response = requests.post(url, json=payload)
logger.info(f"HTTP API - Status: {response.status_code}")
logger.info(f"HTTP API - Response: {response.text}")
if response.status_code == 200 and json.loads(response.text).get("success", False):
success_count += 1
logger.info(f"HTTP API - Success with data type {data_type}")
else:
logger.warning(f"HTTP API - Failed with data type {data_type}")
except Exception as e:
logger.error(f"HTTP API - Error: {e}")
# Short delay between tests
time.sleep(0.5)
return success_count > 0
def main():
try:
result = test_create_data()
if result:
logger.info("Test successful!")
else:
logger.error("All test data types failed")
sys.exit(1)
except Exception as e:
logger.error(f"Unexpected error: {e}")
sys.exit(1)
if __name__ == "__main__":
main()

View File

@ -1,85 +0,0 @@
#!/usr/bin/env python3
"""
Test script to verify the delete_data functionality works properly.
"""
import json
import logging
import sys
import requests
import time
# Setup logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("delete_data_test")
def test_delete_data():
"""Test deleting data."""
# First create data at a specific address
test_address = "08000100" # This should be a valid address in the memory map
test_type = "byte"
# Step 1: Create some data
logger.info(f"Creating test data at {test_address}")
create_url = "http://localhost:8192/data"
create_payload = {
"address": test_address,
"type": test_type,
"newName": "TEST_DELETE_ME"
}
try:
create_response = requests.post(create_url, json=create_payload)
logger.info(f"Create response: {create_response.status_code}")
logger.info(f"Create response: {create_response.text}")
create_success = create_response.status_code == 200 and json.loads(create_response.text).get("success", False)
if not create_success:
logger.warning("Failed to create test data, test may fail")
except Exception as e:
logger.error(f"Error creating test data: {e}")
# Short delay
time.sleep(1)
# Step 2: Delete the data
logger.info(f"Deleting data at {test_address}")
delete_url = "http://localhost:8192/data/delete"
delete_payload = {
"address": test_address,
"action": "delete"
}
try:
delete_response = requests.post(delete_url, json=delete_payload)
logger.info(f"Delete response: {delete_response.status_code}")
logger.info(f"Delete response: {delete_response.text}")
# Check if successful
if delete_response.status_code == 200:
response_data = json.loads(delete_response.text)
if response_data.get("success", False):
logger.info("Successfully deleted data!")
return True
logger.warning("Failed to delete data")
return False
except Exception as e:
logger.error(f"Error deleting data: {e}")
return False
def main():
"""Main entry point."""
try:
result = test_delete_data()
if result:
logger.info("Test successful!")
else:
logger.error("Test failed")
sys.exit(1)
except Exception as e:
logger.error(f"Unexpected error: {e}")
sys.exit(1)
if __name__ == "__main__":
main()

View File

@ -1,126 +1,417 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
""" """
Test script for data operations in GhydraMCP bridge. Comprehensive test script for data operations in GhydraMCP.
This script tests renaming and changing data types.
This script tests all data-related operations including:
1. Creating data items with different types
2. Renaming data items
3. Updating data types
4. Deleting data items
5. Reading memory
Tests are performed using both direct HTTP API and MCP bridge interfaces.
""" """
import json import json
import logging import logging
import sys import sys
import time import time
import requests
import anyio
from typing import Dict, Any
from urllib.parse import quote from urllib.parse import quote
import anyio
from mcp.client.session import ClientSession from mcp.client.session import ClientSession
from mcp.client.stdio import StdioServerParameters, stdio_client from mcp.client.stdio import StdioServerParameters, stdio_client
# Setup logging # Configure logging
logging.basicConfig(level=logging.INFO) logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("data_test") logger = logging.getLogger("data_test")
async def test_data_operations(): # Configure default test values
"""Test data operations using the MCP client""" GHIDRA_PORT = 8192
# Configure the server parameters DEFAULT_MEMORY_ADDRESS = "08000200" # Fallback test address
def wait_for_program_loaded(port=GHIDRA_PORT, timeout=20):
"""Wait for a Ghidra program to be loaded."""
for _ in range(timeout // 2):
try:
response = requests.get(f"http://localhost:{port}/program")
if response.status_code == 200:
data = json.loads(response.text)
if data.get("success", False):
logger.info(f"Program loaded: {data['result']['name']}")
return True
except Exception as e:
logger.warning(f"Error checking program status: {e}")
logger.info("Waiting for program to load...")
time.sleep(2)
logger.error("Timed out waiting for program to load")
return False
def find_valid_addresses(port=GHIDRA_PORT) -> list:
"""Find valid memory addresses for testing by checking memory map."""
try:
response = requests.get(f"http://localhost:{port}/memory")
memory_info = json.loads(response.text)
memory_blocks = memory_info.get("result", [])
valid_addresses = []
# First try to find a RAM block
for block in memory_blocks:
if "start" in block and "name" in block and "RAM" in block["name"].upper():
addr_base = int(block["start"], 16)
for i in range(10):
valid_addresses.append(f"{addr_base + i*4:08x}")
return valid_addresses
# If no RAM blocks, try any memory block
for block in memory_blocks:
if "start" in block:
addr_base = int(block["start"], 16)
for i in range(10):
valid_addresses.append(f"{addr_base + i*4:08x}")
return valid_addresses
except Exception as e:
logger.error(f"Error getting memory map: {e}")
# Fallback addresses if cannot determine from memory map
return ["08000100", "08000104", "08000108", "0800010c", "08000110"]
def test_http_data_create():
"""Test creating data items with different types using HTTP API."""
if not wait_for_program_loaded():
return False
addresses = find_valid_addresses()
if not addresses:
logger.error("No valid addresses found for data creation test")
return False
types_to_try = ["uint", "int", "uint *", "int *", "byte", "word", "dword", "pointer"]
success_count = 0
for i, data_type in enumerate(types_to_try):
address = addresses[i % len(addresses)]
logger.info(f"Testing data type: {data_type} at address {address}")
url = f"http://localhost:{GHIDRA_PORT}/data"
payload = {
"address": address,
"type": data_type,
"newName": f"TEST_{data_type.upper()}"
}
# Add size for string types
if data_type.lower() == "string":
payload["size"] = 16
try:
response = requests.post(url, json=payload)
logger.info(f"Status: {response.status_code}")
logger.info(f"Response: {response.text}")
if response.status_code == 200 and json.loads(response.text).get("success", False):
success_count += 1
logger.info(f"Success with data type {data_type}")
except Exception as e:
logger.error(f"Error: {e}")
time.sleep(0.5)
return success_count > 0
def test_http_data_rename():
"""Test data rename operations using HTTP API."""
addresses = find_valid_addresses()
if not addresses:
return False
test_address = addresses[0]
test_name = f"TEST_RENAME_{int(time.time())}"
# First create a data item to rename
create_url = f"http://localhost:{GHIDRA_PORT}/data"
create_payload = {
"address": test_address,
"type": "int",
"newName": "TEST_BEFORE_RENAME"
}
try:
create_response = requests.post(create_url, json=create_payload)
if create_response.status_code != 200:
logger.warning("Failed to create test data for rename test")
return False
# Rename the data
rename_payload = {
"address": test_address,
"newName": test_name
}
rename_response = requests.post(create_url, json=rename_payload)
logger.info(f"Rename response: {rename_response.status_code}")
logger.info(f"Rename response: {rename_response.text}")
return rename_response.status_code == 200 and json.loads(rename_response.text).get("success", False)
except Exception as e:
logger.error(f"Error in rename test: {e}")
return False
def test_http_data_type_change():
"""Test changing data type using HTTP API."""
addresses = find_valid_addresses()
if not addresses:
return False
test_address = addresses[1]
# First create a data item
create_url = f"http://localhost:{GHIDRA_PORT}/data"
create_payload = {
"address": test_address,
"type": "uint",
"newName": "TEST_TYPE_CHANGE"
}
try:
create_response = requests.post(create_url, json=create_payload)
if create_response.status_code != 200:
logger.warning("Failed to create test data for type change test")
return False
# Change the type
type_url = f"http://localhost:{GHIDRA_PORT}/data/type"
type_payload = {
"address": test_address,
"type": "byte"
}
type_response = requests.post(type_url, json=type_payload)
logger.info(f"Type change response: {type_response.status_code}")
logger.info(f"Type change response: {type_response.text}")
return type_response.status_code == 200 and json.loads(type_response.text).get("success", False)
except Exception as e:
logger.error(f"Error in type change test: {e}")
return False
def test_http_data_delete():
"""Test deleting data using HTTP API."""
addresses = find_valid_addresses()
if not addresses:
return False
test_address = addresses[2]
# First create a data item to delete
create_url = f"http://localhost:{GHIDRA_PORT}/data"
create_payload = {
"address": test_address,
"type": "int",
"newName": "TEST_DELETE_ME"
}
try:
create_response = requests.post(create_url, json=create_payload)
if create_response.status_code != 200:
logger.warning("Failed to create test data for delete test")
return False
# Delete the data
delete_url = f"http://localhost:{GHIDRA_PORT}/data/delete"
delete_payload = {
"address": test_address,
"action": "delete"
}
delete_response = requests.post(delete_url, json=delete_payload)
logger.info(f"Delete response: {delete_response.status_code}")
logger.info(f"Delete response: {delete_response.text}")
return delete_response.status_code == 200 and json.loads(delete_response.text).get("success", False)
except Exception as e:
logger.error(f"Error in delete test: {e}")
return False
def test_http_combined_operations():
"""Test data operations that update both name and type together."""
addresses = find_valid_addresses()
if not addresses:
return False
test_address = addresses[3]
# First create a data item
create_url = f"http://localhost:{GHIDRA_PORT}/data"
create_payload = {
"address": test_address,
"type": "int",
"newName": "TEST_COMBINED_ORIG"
}
try:
create_response = requests.post(create_url, json=create_payload)
if create_response.status_code != 200:
logger.warning("Failed to create test data for combined update test")
return False
# Update both name and type in one operation
update_url = f"http://localhost:{GHIDRA_PORT}/data"
update_payload = {
"address": test_address,
"newName": "TEST_COMBINED_NEW",
"type": "uint"
}
update_response = requests.post(update_url, json=update_payload)
logger.info(f"Combined update response: {update_response.status_code}")
logger.info(f"Combined update response: {update_response.text}")
return update_response.status_code == 200 and json.loads(update_response.text).get("success", False)
except Exception as e:
logger.error(f"Error in combined update test: {e}")
return False
async def test_mcp_data_operations():
"""Test data operations using the MCP bridge."""
server_parameters = StdioServerParameters( server_parameters = StdioServerParameters(
command=sys.executable, command=sys.executable,
args=["bridge_mcp_hydra.py"], args=["bridge_mcp_hydra.py"],
) )
# Connect to the bridge logger.info("Connecting to MCP bridge...")
logger.info("Connecting to bridge...")
async with stdio_client(server_parameters) as (read_stream, write_stream): async with stdio_client(server_parameters) as (read_stream, write_stream):
# Create a session
logger.info("Creating session...")
async with ClientSession(read_stream, write_stream) as session: async with ClientSession(read_stream, write_stream) as session:
# Initialize the session
logger.info("Initializing session...") logger.info("Initializing session...")
init_result = await session.initialize() await session.initialize()
logger.info(f"Initialization result: {init_result}")
# List data to find a data item to test with # First set the current instance
logger.info("Listing data...") logger.info("Setting current Ghidra instance...")
list_data_result = await session.call_tool( await session.call_tool(
"list_data_items", "instances_use",
arguments={"port": 8192, "limit": 5} arguments={"port": 8192}
) )
list_data_data = json.loads(list_data_result.content[0].text)
logger.info(f"List data result: {list_data_data}")
if "result" not in list_data_data or not list_data_data.get("result"): # Get a valid address to work with
logger.error("No data items found - cannot proceed with test") addresses = find_valid_addresses()
return test_address = addresses[4] if addresses and len(addresses) > 4 else DEFAULT_MEMORY_ADDRESS
# Get the first data item for testing logger.info(f"Using address {test_address} for MCP data operations test")
data_item = list_data_data["result"][0]
data_address = data_item.get("address")
original_name = data_item.get("label")
if not data_address: # Test data_create
logger.error("No address found in data item - cannot proceed with test") try:
return logger.info("Testing data_create...")
create_result = await session.call_tool(
logger.info(f"Testing with data at address {data_address}, original name: {original_name}") "data_create",
arguments={"address": test_address, "data_type": "uint"}
# Test renaming the data )
test_name = f"TEST_DATA_{int(time.time())}" create_data = json.loads(create_result.content[0].text)
logger.info(f"Renaming data to {test_name}") assert create_data.get("success", False), "data_create failed"
logger.info("data_create passed")
# Test data_rename
logger.info("Testing data_rename...")
test_name = f"MCP_TEST_{int(time.time())}"
rename_result = await session.call_tool( rename_result = await session.call_tool(
"update_data", "data_rename",
arguments={"port": 8192, "address": data_address, "name": test_name} arguments={"address": test_address, "name": test_name}
) )
rename_data = json.loads(rename_result.content[0].text) rename_data = json.loads(rename_result.content[0].text)
logger.info(f"Rename result: {rename_data}") assert rename_data.get("success", False), "data_rename failed"
logger.info("data_rename passed")
if not rename_data.get("success", False): # Test data_set_type
logger.error(f"Failed to rename data: {rename_data.get('error', {}).get('message', 'Unknown error')}") logger.info("Testing data_set_type...")
else: set_type_result = await session.call_tool(
logger.info("Data renamed successfully") "data_set_type",
arguments={"address": test_address, "data_type": "byte"}
# Test changing the data type
test_type = "uint32_t *" # Pointer to uint32_t - adjust as needed for your test data
logger.info(f"Changing data type to {test_type}")
type_result = await session.call_tool(
"update_data",
arguments={"port": 8192, "address": data_address, "data_type": test_type}
) )
set_type_data = json.loads(set_type_result.content[0].text)
assert set_type_data.get("success", False), "data_set_type failed"
logger.info("data_set_type passed")
type_data = json.loads(type_result.content[0].text) # Test memory_read on the data
logger.info(f"Change type result: {type_data}") logger.info("Testing memory_read...")
read_result = await session.call_tool(
if not type_data.get("success", False): "memory_read",
logger.error(f"Failed to change data type: {type_data.get('error', {}).get('message', 'Unknown error')}") arguments={"address": test_address, "length": 4}
else:
logger.info("Data type changed successfully")
# Test both operations together
logger.info(f"Restoring original name and trying different type")
combined_result = await session.call_tool(
"update_data",
arguments={
"port": 8192,
"address": data_address,
"name": original_name,
"data_type": "uint32_t"
}
) )
read_data = json.loads(read_result.content[0].text)
assert read_data.get("success", False), "memory_read failed"
assert "hexBytes" in read_data, "memory_read response missing hexBytes"
logger.info("memory_read passed")
combined_data = json.loads(combined_result.content[0].text) # Test data_delete
logger.info(f"Combined update result: {combined_data}") logger.info("Testing data_delete...")
delete_result = await session.call_tool(
"data_delete",
arguments={"address": test_address}
)
delete_data = json.loads(delete_result.content[0].text)
assert delete_data.get("success", False), "data_delete failed"
logger.info("data_delete passed")
if not combined_data.get("success", False): logger.info("All MCP data operations passed")
logger.error(f"Failed to perform combined update: {combined_data.get('error', {}).get('message', 'Unknown error')}") return True
else:
logger.info("Combined update successful") except Exception as e:
logger.error(f"Error in MCP data operations test: {e}")
# Try to clean up
try:
await session.call_tool("data_delete", arguments={"address": test_address})
except:
pass
return False
def main(): def main():
"""Main entry point""" """Main entry point for data operations tests."""
all_passed = True
try: try:
anyio.run(test_data_operations) # Run HTTP API tests
logger.info("===== Testing HTTP API Data Operations =====")
logger.info("----- Testing data creation -----")
create_result = test_http_data_create()
logger.info(f"Data creation test: {'PASSED' if create_result else 'FAILED'}")
all_passed = all_passed and create_result
logger.info("----- Testing data rename -----")
rename_result = test_http_data_rename()
logger.info(f"Data rename test: {'PASSED' if rename_result else 'FAILED'}")
all_passed = all_passed and rename_result
logger.info("----- Testing data type change -----")
type_result = test_http_data_type_change()
logger.info(f"Data type change test: {'PASSED' if type_result else 'FAILED'}")
all_passed = all_passed and type_result
logger.info("----- Testing data delete -----")
delete_result = test_http_data_delete()
logger.info(f"Data delete test: {'PASSED' if delete_result else 'FAILED'}")
all_passed = all_passed and delete_result
logger.info("----- Testing combined operations -----")
combined_result = test_http_combined_operations()
logger.info(f"Combined operations test: {'PASSED' if combined_result else 'FAILED'}")
all_passed = all_passed and combined_result
# Run MCP bridge tests
logger.info("===== Testing MCP Bridge Data Operations =====")
mcp_result = anyio.run(test_mcp_data_operations)
logger.info(f"MCP data operations test: {'PASSED' if mcp_result else 'FAILED'}")
all_passed = all_passed and mcp_result
logger.info(f"Overall data operations test: {'PASSED' if all_passed else 'FAILED'}")
if not all_passed:
sys.exit(1)
except Exception as e: except Exception as e:
logger.error(f"Error: {e}") logger.error(f"Unexpected error in data tests: {e}")
sys.exit(1) sys.exit(1)
if __name__ == "__main__": if __name__ == "__main__":

View File

@ -1,54 +0,0 @@
#!/usr/bin/env python3
"""
Direct test for data operations.
"""
import json
import logging
import sys
import requests
# Setup logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("simple_test")
def test_create_data():
address = "08000000"
# Try data types
types_to_try = ["uint32_t", "int", "dword", "byte", "pointer"]
for data_type in types_to_try:
logger.info(f"Testing data type: {data_type}")
url = f"http://localhost:8192/data"
payload = {
"address": address,
"type": data_type,
"newName": f"TEST_{data_type.upper()}" # Include a name for the data
}
try:
response = requests.post(url, json=payload)
logger.info(f"Status: {response.status_code}")
logger.info(f"Response: {response.text}")
if response.status_code == 200:
logger.info(f"Success with data type {data_type}")
return True
except Exception as e:
logger.error(f"Error: {e}")
return False
def main():
try:
result = test_create_data()
if result:
logger.info("Test successful!")
else:
logger.error("All test data types failed")
except Exception as e:
logger.error(f"Unexpected error: {e}")
sys.exit(1)
if __name__ == "__main__":
main()

View File

@ -1,105 +0,0 @@
#!/usr/bin/env python3
"""
Test script for setting data types in GhydraMCP bridge.
"""
import json
import logging
import sys
import time
from urllib.parse import quote
import anyio
from mcp.client.session import ClientSession
from mcp.client.stdio import StdioServerParameters, stdio_client
# Setup logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("data_type_test")
async def test_set_data_type():
"""Test the set_data_type tool"""
# Configure the server parameters
server_parameters = StdioServerParameters(
command=sys.executable,
args=["bridge_mcp_hydra.py"],
)
# Connect to the bridge
logger.info("Connecting to bridge...")
async with stdio_client(server_parameters) as (read_stream, write_stream):
# Create a session
logger.info("Creating session...")
async with ClientSession(read_stream, write_stream) as session:
# Initialize the session
logger.info("Initializing session...")
init_result = await session.initialize()
logger.info(f"Initialization result: {init_result}")
# List tools to make sure our new tool is available
logger.info("Listing tools...")
tools_result = await session.list_tools()
tool_data = json.loads(tools_result.content[0].text) if tools_result.content else None
tools = tool_data.get("tools", []) if tool_data else []
tool_names = [t.get("name") for t in tools]
logger.info(f"Available tools: {tool_names}")
if "set_data_type" not in tool_names:
logger.error("set_data_type tool not found!")
return
# List data to find a data item to test with
logger.info("Listing data...")
list_data_result = await session.call_tool(
"list_data_items",
arguments={"port": 8192, "limit": 5}
)
list_data_data = json.loads(list_data_result.content[0].text)
if "result" not in list_data_data or not list_data_data.get("result"):
logger.error("No data items found - cannot proceed with test")
return
# Get the first data item for testing
data_item = list_data_data["result"][0]
data_address = data_item.get("address")
original_type = data_item.get("dataType")
if not data_address:
logger.error("No address found in data item - cannot proceed with test")
return
logger.info(f"Testing with data at address {data_address}, original type: {original_type}")
# Test with simple types first
simple_tests = ["uint32_t", "int", "byte", "word", "dword"]
for test_type in simple_tests:
logger.info(f"Testing type: {test_type}")
set_type_result = await session.call_tool(
"set_data_type",
arguments={"port": 8192, "address": data_address, "data_type": test_type}
)
try:
set_type_data = json.loads(set_type_result.content[0].text)
logger.info(f"Result: {set_type_data}")
if set_type_data.get("success", False):
logger.info(f"Successfully set type to {test_type}")
break
else:
logger.warning(f"Failed to set type to {test_type}: {set_type_data.get('error', {}).get('message', 'Unknown error')}")
except Exception as e:
logger.error(f"Error processing result: {e}")
def main():
"""Main entry point"""
try:
anyio.run(test_set_data_type)
except Exception as e:
logger.error(f"Error: {e}")
sys.exit(1)
if __name__ == "__main__":
main()

View File

@ -1,181 +0,0 @@
#!/usr/bin/env python3
"""
Dedicated test script for the GhydraMCP data handling API.
This script has standalone tests to validate the three key data manipulation operations:
1. Rename only - Change the name without changing the data type
2. Type change only - Change the data type while preserving the name
3. Update both - Change both name and type simultaneously
These tests operate on a low level and can be run independently of the main test suite
to diagnose issues with the API's data handling capabilities.
Usage:
python test_data_update.py
"""
import json
import requests
import sys
import argparse
BASE_URL = "http://localhost:8192"
def test_data_update(verbose=True, base_url=None):
"""Test data update operations
Args:
verbose: Whether to print detailed output
base_url: Base URL for the Ghidra HTTP API (default: http://localhost:8192)
Returns:
bool: True if all tests pass, False otherwise
"""
if base_url:
global BASE_URL
BASE_URL = base_url
# Track test results
all_tests_passed = True
# First find a suitable data item to test with
if verbose:
print("Fetching data items...")
response = requests.get(f"{BASE_URL}/data?limit=1")
if response.status_code != 200:
print(f"Error: Failed to fetch data items, status {response.status_code}")
print(response.text)
return False
data = response.json()
if not data.get("success"):
print(f"Error: API call failed: {data.get('error', 'Unknown error')}")
return False
# Extract address from first data item
result = data.get("result", [])
if not result or not isinstance(result, list) or not result[0].get("address"):
print("Error: No data items found or invalid response format")
if result and verbose:
print(f"Result: {json.dumps(result, indent=2)}")
return False
address = result[0]["address"]
if verbose:
print(f"Using data item at address: {address}")
# Test 1: Renaming only
if verbose:
print("\n--- Test 1: Rename Only ---")
test_name = "TEST_DATA_RENAME"
payload = {
"address": address,
"newName": test_name
}
if verbose:
print(f"Request: POST {BASE_URL}/data")
print(f"Payload: {json.dumps(payload, indent=2)}")
response = requests.post(f"{BASE_URL}/data", json=payload)
if verbose:
print(f"Status: {response.status_code}")
print(f"Response: {json.dumps(response.json(), indent=2)}")
# Check Test 1 results
test1_passed = response.status_code == 200 and response.json().get("success")
if not test1_passed:
print(f"ERROR: Test 1 (Rename Only) failed: {response.status_code}")
all_tests_passed = False
# Test 2: Type change only
if verbose:
print("\n--- Test 2: Type Change Only ---")
payload = {
"address": address,
"type": "int" # Using 'type' as parameter name
}
if verbose:
print(f"Request: POST {BASE_URL}/data/type")
print(f"Payload: {json.dumps(payload, indent=2)}")
response = requests.post(f"{BASE_URL}/data/type", json=payload)
if verbose:
print(f"Status: {response.status_code}")
print(f"Response: {json.dumps(response.json(), indent=2)}")
# Check Test 2 results
test2_passed = response.status_code == 200 and response.json().get("success")
if not test2_passed:
print(f"ERROR: Test 2 (Type Change Only) failed: {response.status_code}")
all_tests_passed = False
# Test 3: Both name and type change
if verbose:
print("\n--- Test 3: Both Name and Type Change ---")
payload = {
"address": address,
"newName": "TEST_DATA_BOTH",
"type": "byte" # Using 'type' as parameter name
}
if verbose:
print(f"Request: POST {BASE_URL}/data/update")
print(f"Payload: {json.dumps(payload, indent=2)}")
response = requests.post(f"{BASE_URL}/data/update", json=payload)
if verbose:
print(f"Status: {response.status_code}")
print(f"Response: {json.dumps(response.json(), indent=2)}")
# Check Test 3 results
test3_passed = response.status_code == 200 and response.json().get("success")
if not test3_passed:
print(f"ERROR: Test 3 (Both Name and Type Change via /data/update) failed: {response.status_code}")
all_tests_passed = False
# Test 4: Direct raw request using the /data endpoint
if verbose:
print("\n--- Test 4: Direct Request to /data endpoint ---")
payload = {
"address": address,
"newName": "TEST_DIRECT_UPDATE",
"type": "int" # Using 'type' parameter name
}
if verbose:
print(f"Request: POST {BASE_URL}/data")
print(f"Payload: {json.dumps(payload, indent=2)}")
response = requests.post(f"{BASE_URL}/data", json=payload)
if verbose:
print(f"Status: {response.status_code}")
print(f"Response: {json.dumps(response.json(), indent=2)}")
# Check Test 4 results
test4_passed = response.status_code == 200 and response.json().get("success")
if not test4_passed:
print(f"ERROR: Test 4 (Both Name and Type Change via /data) failed: {response.status_code}")
all_tests_passed = False
# Print summary
if verbose:
print("\n--- Test Summary ---")
print(f"Test 1 (Rename Only): {'PASSED' if test1_passed else 'FAILED'}")
print(f"Test 2 (Type Change Only): {'PASSED' if test2_passed else 'FAILED'}")
print(f"Test 3 (Both Name and Type Change via /data/update): {'PASSED' if test3_passed else 'FAILED'}")
print(f"Test 4 (Both Name and Type Change via /data): {'PASSED' if test4_passed else 'FAILED'}")
print(f"Overall: {'ALL TESTS PASSED' if all_tests_passed else 'SOME TESTS FAILED'}")
return all_tests_passed
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Test data operations in the GhydraMCP HTTP API")
parser.add_argument("--quiet", "-q", action="store_true", help="Suppress detailed output")
parser.add_argument("--url", "-u", help="Base URL for the Ghidra HTTP API")
args = parser.parse_args()
success = test_data_update(not args.quiet, args.url)
if not success:
sys.exit(1)