mcghidra/test_data_operations.py
Ryan Malloy 1143489924
Some checks are pending
Build Ghidra Plugin / build (push) Waiting to run
refactor: Rename project from ghydramcp to mcghidra
- Rename src/ghydramcp → src/mcghidra
- Rename GhydraMCPPlugin.java → MCGhidraPlugin.java
- Update all imports, class names, and references
- Update pyproject.toml package name and script entry
- Update Docker image names and container prefixes
- Update environment variables: GHYDRA_* → MCGHIDRA_*
- Update all documentation references
2026-02-07 02:13:53 -07:00

418 lines
15 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Comprehensive test script for data operations in MCGhidra.
This script tests all data-related operations including:
1. Creating data items with different types
2. Renaming data items
3. Updating data types
4. Deleting data items
5. Reading memory
Tests are performed using both direct HTTP API and MCP bridge interfaces.
"""
import json
import logging
import sys
import time
import requests
import anyio
from typing import Dict, Any
from urllib.parse import quote
from mcp.client.session import ClientSession
from mcp.client.stdio import StdioServerParameters, stdio_client
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("data_test")
# Configure default test values
GHIDRA_PORT = 8192
DEFAULT_MEMORY_ADDRESS = "08000200" # Fallback test address
def wait_for_program_loaded(port=GHIDRA_PORT, timeout=20):
"""Wait for a Ghidra program to be loaded."""
for _ in range(timeout // 2):
try:
response = requests.get(f"http://localhost:{port}/program")
if response.status_code == 200:
data = json.loads(response.text)
if data.get("success", False):
logger.info(f"Program loaded: {data['result']['name']}")
return True
except Exception as e:
logger.warning(f"Error checking program status: {e}")
logger.info("Waiting for program to load...")
time.sleep(2)
logger.error("Timed out waiting for program to load")
return False
def find_valid_addresses(port=GHIDRA_PORT) -> list:
"""Find valid memory addresses for testing by checking memory map."""
try:
response = requests.get(f"http://localhost:{port}/memory")
memory_info = json.loads(response.text)
memory_blocks = memory_info.get("result", [])
valid_addresses = []
# First try to find a RAM block
for block in memory_blocks:
if "start" in block and "name" in block and "RAM" in block["name"].upper():
addr_base = int(block["start"], 16)
for i in range(10):
valid_addresses.append(f"{addr_base + i*4:08x}")
return valid_addresses
# If no RAM blocks, try any memory block
for block in memory_blocks:
if "start" in block:
addr_base = int(block["start"], 16)
for i in range(10):
valid_addresses.append(f"{addr_base + i*4:08x}")
return valid_addresses
except Exception as e:
logger.error(f"Error getting memory map: {e}")
# Fallback addresses if cannot determine from memory map
return ["08000100", "08000104", "08000108", "0800010c", "08000110"]
def test_http_data_create():
"""Test creating data items with different types using HTTP API."""
if not wait_for_program_loaded():
return False
addresses = find_valid_addresses()
if not addresses:
logger.error("No valid addresses found for data creation test")
return False
types_to_try = ["uint", "int", "uint *", "int *", "byte", "word", "dword", "pointer"]
success_count = 0
for i, data_type in enumerate(types_to_try):
address = addresses[i % len(addresses)]
logger.info(f"Testing data type: {data_type} at address {address}")
url = f"http://localhost:{GHIDRA_PORT}/data"
payload = {
"address": address,
"type": data_type,
"newName": f"TEST_{data_type.upper()}"
}
# Add size for string types
if data_type.lower() == "string":
payload["size"] = 16
try:
response = requests.post(url, json=payload)
logger.info(f"Status: {response.status_code}")
logger.info(f"Response: {response.text}")
if response.status_code == 200 and json.loads(response.text).get("success", False):
success_count += 1
logger.info(f"Success with data type {data_type}")
except Exception as e:
logger.error(f"Error: {e}")
time.sleep(0.5)
return success_count > 0
def test_http_data_rename():
"""Test data rename operations using HTTP API."""
addresses = find_valid_addresses()
if not addresses:
return False
test_address = addresses[0]
test_name = f"TEST_RENAME_{int(time.time())}"
# First create a data item to rename
create_url = f"http://localhost:{GHIDRA_PORT}/data"
create_payload = {
"address": test_address,
"type": "int",
"newName": "TEST_BEFORE_RENAME"
}
try:
create_response = requests.post(create_url, json=create_payload)
if create_response.status_code != 200:
logger.warning("Failed to create test data for rename test")
return False
# Rename the data
rename_payload = {
"address": test_address,
"newName": test_name
}
rename_response = requests.post(create_url, json=rename_payload)
logger.info(f"Rename response: {rename_response.status_code}")
logger.info(f"Rename response: {rename_response.text}")
return rename_response.status_code == 200 and json.loads(rename_response.text).get("success", False)
except Exception as e:
logger.error(f"Error in rename test: {e}")
return False
def test_http_data_type_change():
"""Test changing data type using HTTP API."""
addresses = find_valid_addresses()
if not addresses:
return False
test_address = addresses[1]
# First create a data item
create_url = f"http://localhost:{GHIDRA_PORT}/data"
create_payload = {
"address": test_address,
"type": "uint",
"newName": "TEST_TYPE_CHANGE"
}
try:
create_response = requests.post(create_url, json=create_payload)
if create_response.status_code != 200:
logger.warning("Failed to create test data for type change test")
return False
# Change the type
type_url = f"http://localhost:{GHIDRA_PORT}/data/type"
type_payload = {
"address": test_address,
"type": "byte"
}
type_response = requests.post(type_url, json=type_payload)
logger.info(f"Type change response: {type_response.status_code}")
logger.info(f"Type change response: {type_response.text}")
return type_response.status_code == 200 and json.loads(type_response.text).get("success", False)
except Exception as e:
logger.error(f"Error in type change test: {e}")
return False
def test_http_data_delete():
"""Test deleting data using HTTP API."""
addresses = find_valid_addresses()
if not addresses:
return False
test_address = addresses[2]
# First create a data item to delete
create_url = f"http://localhost:{GHIDRA_PORT}/data"
create_payload = {
"address": test_address,
"type": "int",
"newName": "TEST_DELETE_ME"
}
try:
create_response = requests.post(create_url, json=create_payload)
if create_response.status_code != 200:
logger.warning("Failed to create test data for delete test")
return False
# Delete the data
delete_url = f"http://localhost:{GHIDRA_PORT}/data/delete"
delete_payload = {
"address": test_address,
"action": "delete"
}
delete_response = requests.post(delete_url, json=delete_payload)
logger.info(f"Delete response: {delete_response.status_code}")
logger.info(f"Delete response: {delete_response.text}")
return delete_response.status_code == 200 and json.loads(delete_response.text).get("success", False)
except Exception as e:
logger.error(f"Error in delete test: {e}")
return False
def test_http_combined_operations():
"""Test data operations that update both name and type together."""
addresses = find_valid_addresses()
if not addresses:
return False
test_address = addresses[3]
# First create a data item
create_url = f"http://localhost:{GHIDRA_PORT}/data"
create_payload = {
"address": test_address,
"type": "int",
"newName": "TEST_COMBINED_ORIG"
}
try:
create_response = requests.post(create_url, json=create_payload)
if create_response.status_code != 200:
logger.warning("Failed to create test data for combined update test")
return False
# Update both name and type in one operation
update_url = f"http://localhost:{GHIDRA_PORT}/data"
update_payload = {
"address": test_address,
"newName": "TEST_COMBINED_NEW",
"type": "uint"
}
update_response = requests.post(update_url, json=update_payload)
logger.info(f"Combined update response: {update_response.status_code}")
logger.info(f"Combined update response: {update_response.text}")
return update_response.status_code == 200 and json.loads(update_response.text).get("success", False)
except Exception as e:
logger.error(f"Error in combined update test: {e}")
return False
async def test_mcp_data_operations():
"""Test data operations using the MCP bridge."""
server_parameters = StdioServerParameters(
command=sys.executable,
args=["bridge_mcp_hydra.py"],
)
logger.info("Connecting to MCP bridge...")
async with stdio_client(server_parameters) as (read_stream, write_stream):
async with ClientSession(read_stream, write_stream) as session:
logger.info("Initializing session...")
await session.initialize()
# First set the current instance
logger.info("Setting current Ghidra instance...")
await session.call_tool(
"instances_use",
arguments={"port": 8192}
)
# Get a valid address to work with
addresses = find_valid_addresses()
test_address = addresses[4] if addresses and len(addresses) > 4 else DEFAULT_MEMORY_ADDRESS
logger.info(f"Using address {test_address} for MCP data operations test")
# Test data_create
try:
logger.info("Testing data_create...")
create_result = await session.call_tool(
"data_create",
arguments={"address": test_address, "data_type": "uint"}
)
create_data = json.loads(create_result.content[0].text)
assert create_data.get("success", False), "data_create failed"
logger.info("data_create passed")
# Test data_rename
logger.info("Testing data_rename...")
test_name = f"MCP_TEST_{int(time.time())}"
rename_result = await session.call_tool(
"data_rename",
arguments={"address": test_address, "name": test_name}
)
rename_data = json.loads(rename_result.content[0].text)
assert rename_data.get("success", False), "data_rename failed"
logger.info("data_rename passed")
# Test data_set_type
logger.info("Testing data_set_type...")
set_type_result = await session.call_tool(
"data_set_type",
arguments={"address": test_address, "data_type": "byte"}
)
set_type_data = json.loads(set_type_result.content[0].text)
assert set_type_data.get("success", False), "data_set_type failed"
logger.info("data_set_type passed")
# Test memory_read on the data
logger.info("Testing memory_read...")
read_result = await session.call_tool(
"memory_read",
arguments={"address": test_address, "length": 4}
)
read_data = json.loads(read_result.content[0].text)
assert read_data.get("success", False), "memory_read failed"
assert "hexBytes" in read_data, "memory_read response missing hexBytes"
logger.info("memory_read passed")
# Test data_delete
logger.info("Testing data_delete...")
delete_result = await session.call_tool(
"data_delete",
arguments={"address": test_address}
)
delete_data = json.loads(delete_result.content[0].text)
assert delete_data.get("success", False), "data_delete failed"
logger.info("data_delete passed")
logger.info("All MCP data operations passed")
return True
except Exception as e:
logger.error(f"Error in MCP data operations test: {e}")
# Try to clean up
try:
await session.call_tool("data_delete", arguments={"address": test_address})
except:
pass
return False
def main():
"""Main entry point for data operations tests."""
all_passed = True
try:
# Run HTTP API tests
logger.info("===== Testing HTTP API Data Operations =====")
logger.info("----- Testing data creation -----")
create_result = test_http_data_create()
logger.info(f"Data creation test: {'PASSED' if create_result else 'FAILED'}")
all_passed = all_passed and create_result
logger.info("----- Testing data rename -----")
rename_result = test_http_data_rename()
logger.info(f"Data rename test: {'PASSED' if rename_result else 'FAILED'}")
all_passed = all_passed and rename_result
logger.info("----- Testing data type change -----")
type_result = test_http_data_type_change()
logger.info(f"Data type change test: {'PASSED' if type_result else 'FAILED'}")
all_passed = all_passed and type_result
logger.info("----- Testing data delete -----")
delete_result = test_http_data_delete()
logger.info(f"Data delete test: {'PASSED' if delete_result else 'FAILED'}")
all_passed = all_passed and delete_result
logger.info("----- Testing combined operations -----")
combined_result = test_http_combined_operations()
logger.info(f"Combined operations test: {'PASSED' if combined_result else 'FAILED'}")
all_passed = all_passed and combined_result
# Run MCP bridge tests
logger.info("===== Testing MCP Bridge Data Operations =====")
mcp_result = anyio.run(test_mcp_data_operations)
logger.info(f"MCP data operations test: {'PASSED' if mcp_result else 'FAILED'}")
all_passed = all_passed and mcp_result
logger.info(f"Overall data operations test: {'PASSED' if all_passed else 'FAILED'}")
if not all_passed:
sys.exit(1)
except Exception as e:
logger.error(f"Unexpected error in data tests: {e}")
sys.exit(1)
if __name__ == "__main__":
main()