Some checks are pending
Build Ghidra Plugin / build (push) Waiting to run
- Rename src/ghydramcp → src/mcghidra - Rename GhydraMCPPlugin.java → MCGhidraPlugin.java - Update all imports, class names, and references - Update pyproject.toml package name and script entry - Update Docker image names and container prefixes - Update environment variables: GHYDRA_* → MCGHIDRA_* - Update all documentation references
418 lines
15 KiB
Python
Executable File
418 lines
15 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
Comprehensive test script for data operations in MCGhidra.
|
|
|
|
This script tests all data-related operations including:
|
|
1. Creating data items with different types
|
|
2. Renaming data items
|
|
3. Updating data types
|
|
4. Deleting data items
|
|
5. Reading memory
|
|
|
|
Tests are performed using both direct HTTP API and MCP bridge interfaces.
|
|
"""
|
|
import json
|
|
import logging
|
|
import sys
|
|
import time
|
|
import requests
|
|
import anyio
|
|
from typing import Dict, Any
|
|
from urllib.parse import quote
|
|
|
|
from mcp.client.session import ClientSession
|
|
from mcp.client.stdio import StdioServerParameters, stdio_client
|
|
|
|
# Configure logging
|
|
logging.basicConfig(level=logging.INFO)
|
|
logger = logging.getLogger("data_test")
|
|
|
|
# Configure default test values
|
|
GHIDRA_PORT = 8192
|
|
DEFAULT_MEMORY_ADDRESS = "08000200" # Fallback test address
|
|
|
|
def wait_for_program_loaded(port=GHIDRA_PORT, timeout=20):
|
|
"""Wait for a Ghidra program to be loaded."""
|
|
for _ in range(timeout // 2):
|
|
try:
|
|
response = requests.get(f"http://localhost:{port}/program")
|
|
if response.status_code == 200:
|
|
data = json.loads(response.text)
|
|
if data.get("success", False):
|
|
logger.info(f"Program loaded: {data['result']['name']}")
|
|
return True
|
|
except Exception as e:
|
|
logger.warning(f"Error checking program status: {e}")
|
|
|
|
logger.info("Waiting for program to load...")
|
|
time.sleep(2)
|
|
|
|
logger.error("Timed out waiting for program to load")
|
|
return False
|
|
|
|
def find_valid_addresses(port=GHIDRA_PORT) -> list:
|
|
"""Find valid memory addresses for testing by checking memory map."""
|
|
try:
|
|
response = requests.get(f"http://localhost:{port}/memory")
|
|
memory_info = json.loads(response.text)
|
|
|
|
memory_blocks = memory_info.get("result", [])
|
|
valid_addresses = []
|
|
|
|
# First try to find a RAM block
|
|
for block in memory_blocks:
|
|
if "start" in block and "name" in block and "RAM" in block["name"].upper():
|
|
addr_base = int(block["start"], 16)
|
|
for i in range(10):
|
|
valid_addresses.append(f"{addr_base + i*4:08x}")
|
|
return valid_addresses
|
|
|
|
# If no RAM blocks, try any memory block
|
|
for block in memory_blocks:
|
|
if "start" in block:
|
|
addr_base = int(block["start"], 16)
|
|
for i in range(10):
|
|
valid_addresses.append(f"{addr_base + i*4:08x}")
|
|
return valid_addresses
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting memory map: {e}")
|
|
|
|
# Fallback addresses if cannot determine from memory map
|
|
return ["08000100", "08000104", "08000108", "0800010c", "08000110"]
|
|
|
|
def test_http_data_create():
|
|
"""Test creating data items with different types using HTTP API."""
|
|
if not wait_for_program_loaded():
|
|
return False
|
|
|
|
addresses = find_valid_addresses()
|
|
if not addresses:
|
|
logger.error("No valid addresses found for data creation test")
|
|
return False
|
|
|
|
types_to_try = ["uint", "int", "uint *", "int *", "byte", "word", "dword", "pointer"]
|
|
success_count = 0
|
|
|
|
for i, data_type in enumerate(types_to_try):
|
|
address = addresses[i % len(addresses)]
|
|
logger.info(f"Testing data type: {data_type} at address {address}")
|
|
|
|
url = f"http://localhost:{GHIDRA_PORT}/data"
|
|
payload = {
|
|
"address": address,
|
|
"type": data_type,
|
|
"newName": f"TEST_{data_type.upper()}"
|
|
}
|
|
|
|
# Add size for string types
|
|
if data_type.lower() == "string":
|
|
payload["size"] = 16
|
|
|
|
try:
|
|
response = requests.post(url, json=payload)
|
|
logger.info(f"Status: {response.status_code}")
|
|
logger.info(f"Response: {response.text}")
|
|
if response.status_code == 200 and json.loads(response.text).get("success", False):
|
|
success_count += 1
|
|
logger.info(f"Success with data type {data_type}")
|
|
except Exception as e:
|
|
logger.error(f"Error: {e}")
|
|
|
|
time.sleep(0.5)
|
|
|
|
return success_count > 0
|
|
|
|
def test_http_data_rename():
|
|
"""Test data rename operations using HTTP API."""
|
|
addresses = find_valid_addresses()
|
|
if not addresses:
|
|
return False
|
|
|
|
test_address = addresses[0]
|
|
test_name = f"TEST_RENAME_{int(time.time())}"
|
|
|
|
# First create a data item to rename
|
|
create_url = f"http://localhost:{GHIDRA_PORT}/data"
|
|
create_payload = {
|
|
"address": test_address,
|
|
"type": "int",
|
|
"newName": "TEST_BEFORE_RENAME"
|
|
}
|
|
|
|
try:
|
|
create_response = requests.post(create_url, json=create_payload)
|
|
if create_response.status_code != 200:
|
|
logger.warning("Failed to create test data for rename test")
|
|
return False
|
|
|
|
# Rename the data
|
|
rename_payload = {
|
|
"address": test_address,
|
|
"newName": test_name
|
|
}
|
|
|
|
rename_response = requests.post(create_url, json=rename_payload)
|
|
logger.info(f"Rename response: {rename_response.status_code}")
|
|
logger.info(f"Rename response: {rename_response.text}")
|
|
|
|
return rename_response.status_code == 200 and json.loads(rename_response.text).get("success", False)
|
|
except Exception as e:
|
|
logger.error(f"Error in rename test: {e}")
|
|
return False
|
|
|
|
def test_http_data_type_change():
|
|
"""Test changing data type using HTTP API."""
|
|
addresses = find_valid_addresses()
|
|
if not addresses:
|
|
return False
|
|
|
|
test_address = addresses[1]
|
|
|
|
# First create a data item
|
|
create_url = f"http://localhost:{GHIDRA_PORT}/data"
|
|
create_payload = {
|
|
"address": test_address,
|
|
"type": "uint",
|
|
"newName": "TEST_TYPE_CHANGE"
|
|
}
|
|
|
|
try:
|
|
create_response = requests.post(create_url, json=create_payload)
|
|
if create_response.status_code != 200:
|
|
logger.warning("Failed to create test data for type change test")
|
|
return False
|
|
|
|
# Change the type
|
|
type_url = f"http://localhost:{GHIDRA_PORT}/data/type"
|
|
type_payload = {
|
|
"address": test_address,
|
|
"type": "byte"
|
|
}
|
|
|
|
type_response = requests.post(type_url, json=type_payload)
|
|
logger.info(f"Type change response: {type_response.status_code}")
|
|
logger.info(f"Type change response: {type_response.text}")
|
|
|
|
return type_response.status_code == 200 and json.loads(type_response.text).get("success", False)
|
|
except Exception as e:
|
|
logger.error(f"Error in type change test: {e}")
|
|
return False
|
|
|
|
def test_http_data_delete():
|
|
"""Test deleting data using HTTP API."""
|
|
addresses = find_valid_addresses()
|
|
if not addresses:
|
|
return False
|
|
|
|
test_address = addresses[2]
|
|
|
|
# First create a data item to delete
|
|
create_url = f"http://localhost:{GHIDRA_PORT}/data"
|
|
create_payload = {
|
|
"address": test_address,
|
|
"type": "int",
|
|
"newName": "TEST_DELETE_ME"
|
|
}
|
|
|
|
try:
|
|
create_response = requests.post(create_url, json=create_payload)
|
|
if create_response.status_code != 200:
|
|
logger.warning("Failed to create test data for delete test")
|
|
return False
|
|
|
|
# Delete the data
|
|
delete_url = f"http://localhost:{GHIDRA_PORT}/data/delete"
|
|
delete_payload = {
|
|
"address": test_address,
|
|
"action": "delete"
|
|
}
|
|
|
|
delete_response = requests.post(delete_url, json=delete_payload)
|
|
logger.info(f"Delete response: {delete_response.status_code}")
|
|
logger.info(f"Delete response: {delete_response.text}")
|
|
|
|
return delete_response.status_code == 200 and json.loads(delete_response.text).get("success", False)
|
|
except Exception as e:
|
|
logger.error(f"Error in delete test: {e}")
|
|
return False
|
|
|
|
def test_http_combined_operations():
|
|
"""Test data operations that update both name and type together."""
|
|
addresses = find_valid_addresses()
|
|
if not addresses:
|
|
return False
|
|
|
|
test_address = addresses[3]
|
|
|
|
# First create a data item
|
|
create_url = f"http://localhost:{GHIDRA_PORT}/data"
|
|
create_payload = {
|
|
"address": test_address,
|
|
"type": "int",
|
|
"newName": "TEST_COMBINED_ORIG"
|
|
}
|
|
|
|
try:
|
|
create_response = requests.post(create_url, json=create_payload)
|
|
if create_response.status_code != 200:
|
|
logger.warning("Failed to create test data for combined update test")
|
|
return False
|
|
|
|
# Update both name and type in one operation
|
|
update_url = f"http://localhost:{GHIDRA_PORT}/data"
|
|
update_payload = {
|
|
"address": test_address,
|
|
"newName": "TEST_COMBINED_NEW",
|
|
"type": "uint"
|
|
}
|
|
|
|
update_response = requests.post(update_url, json=update_payload)
|
|
logger.info(f"Combined update response: {update_response.status_code}")
|
|
logger.info(f"Combined update response: {update_response.text}")
|
|
|
|
return update_response.status_code == 200 and json.loads(update_response.text).get("success", False)
|
|
except Exception as e:
|
|
logger.error(f"Error in combined update test: {e}")
|
|
return False
|
|
|
|
async def test_mcp_data_operations():
|
|
"""Test data operations using the MCP bridge."""
|
|
server_parameters = StdioServerParameters(
|
|
command=sys.executable,
|
|
args=["bridge_mcp_hydra.py"],
|
|
)
|
|
|
|
logger.info("Connecting to MCP bridge...")
|
|
async with stdio_client(server_parameters) as (read_stream, write_stream):
|
|
async with ClientSession(read_stream, write_stream) as session:
|
|
logger.info("Initializing session...")
|
|
await session.initialize()
|
|
|
|
# First set the current instance
|
|
logger.info("Setting current Ghidra instance...")
|
|
await session.call_tool(
|
|
"instances_use",
|
|
arguments={"port": 8192}
|
|
)
|
|
|
|
# Get a valid address to work with
|
|
addresses = find_valid_addresses()
|
|
test_address = addresses[4] if addresses and len(addresses) > 4 else DEFAULT_MEMORY_ADDRESS
|
|
|
|
logger.info(f"Using address {test_address} for MCP data operations test")
|
|
|
|
# Test data_create
|
|
try:
|
|
logger.info("Testing data_create...")
|
|
create_result = await session.call_tool(
|
|
"data_create",
|
|
arguments={"address": test_address, "data_type": "uint"}
|
|
)
|
|
create_data = json.loads(create_result.content[0].text)
|
|
assert create_data.get("success", False), "data_create failed"
|
|
logger.info("data_create passed")
|
|
|
|
# Test data_rename
|
|
logger.info("Testing data_rename...")
|
|
test_name = f"MCP_TEST_{int(time.time())}"
|
|
rename_result = await session.call_tool(
|
|
"data_rename",
|
|
arguments={"address": test_address, "name": test_name}
|
|
)
|
|
rename_data = json.loads(rename_result.content[0].text)
|
|
assert rename_data.get("success", False), "data_rename failed"
|
|
logger.info("data_rename passed")
|
|
|
|
# Test data_set_type
|
|
logger.info("Testing data_set_type...")
|
|
set_type_result = await session.call_tool(
|
|
"data_set_type",
|
|
arguments={"address": test_address, "data_type": "byte"}
|
|
)
|
|
set_type_data = json.loads(set_type_result.content[0].text)
|
|
assert set_type_data.get("success", False), "data_set_type failed"
|
|
logger.info("data_set_type passed")
|
|
|
|
# Test memory_read on the data
|
|
logger.info("Testing memory_read...")
|
|
read_result = await session.call_tool(
|
|
"memory_read",
|
|
arguments={"address": test_address, "length": 4}
|
|
)
|
|
read_data = json.loads(read_result.content[0].text)
|
|
assert read_data.get("success", False), "memory_read failed"
|
|
assert "hexBytes" in read_data, "memory_read response missing hexBytes"
|
|
logger.info("memory_read passed")
|
|
|
|
# Test data_delete
|
|
logger.info("Testing data_delete...")
|
|
delete_result = await session.call_tool(
|
|
"data_delete",
|
|
arguments={"address": test_address}
|
|
)
|
|
delete_data = json.loads(delete_result.content[0].text)
|
|
assert delete_data.get("success", False), "data_delete failed"
|
|
logger.info("data_delete passed")
|
|
|
|
logger.info("All MCP data operations passed")
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error in MCP data operations test: {e}")
|
|
# Try to clean up
|
|
try:
|
|
await session.call_tool("data_delete", arguments={"address": test_address})
|
|
except:
|
|
pass
|
|
return False
|
|
|
|
def main():
|
|
"""Main entry point for data operations tests."""
|
|
all_passed = True
|
|
|
|
try:
|
|
# Run HTTP API tests
|
|
logger.info("===== Testing HTTP API Data Operations =====")
|
|
|
|
logger.info("----- Testing data creation -----")
|
|
create_result = test_http_data_create()
|
|
logger.info(f"Data creation test: {'PASSED' if create_result else 'FAILED'}")
|
|
all_passed = all_passed and create_result
|
|
|
|
logger.info("----- Testing data rename -----")
|
|
rename_result = test_http_data_rename()
|
|
logger.info(f"Data rename test: {'PASSED' if rename_result else 'FAILED'}")
|
|
all_passed = all_passed and rename_result
|
|
|
|
logger.info("----- Testing data type change -----")
|
|
type_result = test_http_data_type_change()
|
|
logger.info(f"Data type change test: {'PASSED' if type_result else 'FAILED'}")
|
|
all_passed = all_passed and type_result
|
|
|
|
logger.info("----- Testing data delete -----")
|
|
delete_result = test_http_data_delete()
|
|
logger.info(f"Data delete test: {'PASSED' if delete_result else 'FAILED'}")
|
|
all_passed = all_passed and delete_result
|
|
|
|
logger.info("----- Testing combined operations -----")
|
|
combined_result = test_http_combined_operations()
|
|
logger.info(f"Combined operations test: {'PASSED' if combined_result else 'FAILED'}")
|
|
all_passed = all_passed and combined_result
|
|
|
|
# Run MCP bridge tests
|
|
logger.info("===== Testing MCP Bridge Data Operations =====")
|
|
mcp_result = anyio.run(test_mcp_data_operations)
|
|
logger.info(f"MCP data operations test: {'PASSED' if mcp_result else 'FAILED'}")
|
|
all_passed = all_passed and mcp_result
|
|
|
|
logger.info(f"Overall data operations test: {'PASSED' if all_passed else 'FAILED'}")
|
|
if not all_passed:
|
|
sys.exit(1)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Unexpected error in data tests: {e}")
|
|
sys.exit(1)
|
|
|
|
if __name__ == "__main__":
|
|
main() |