feat: Add data renaming and type setting capabilities

- Add support for setting data types and renaming data items
- Fix Java API implementation for data operation endpoints
- Create rename_data and set_data_type tools for clearer separation of concerns
- Add comprehensive test scripts for data operations
- Successfully test changing data types and naming
This commit is contained in:
Teal Bauer 2025-04-14 12:07:44 +02:00
parent 2a1607cacf
commit 5797fb38e7
5 changed files with 699 additions and 0 deletions

View File

@ -1540,6 +1540,159 @@ def create_data(port: int = DEFAULT_GHIDRA_PORT,
return simplify_response(response) return simplify_response(response)
@mcp.tool()
def rename_data(port: int = DEFAULT_GHIDRA_PORT,
address: str = "",
name: str = "") -> dict:
"""Rename a data item
Args:
port: Ghidra instance port (default: 8192)
address: Memory address in hex format
name: New name for the data item
Returns:
dict: Operation result with the updated data information
"""
if not address or not name:
return {
"success": False,
"error": {
"code": "MISSING_PARAMETER",
"message": "Address and name parameters are required"
},
"timestamp": int(time.time() * 1000)
}
payload = {
"address": address,
"newName": name
}
response = safe_post(port, "data", payload)
return simplify_response(response)
@mcp.tool()
def update_data(port: int = DEFAULT_GHIDRA_PORT,
address: str = "",
name: str = None,
data_type: str = None) -> dict:
"""Update a data item's name and/or type
Args:
port: Ghidra instance port (default: 8192)
address: Memory address in hex format
name: New name for the data item
data_type: New data type (e.g. "uint32_t *", "char[10]", "struct point")
Returns:
dict: Operation result with the updated data information
"""
if not address or (name is None and data_type is None):
return {
"success": False,
"error": {
"code": "MISSING_PARAMETER",
"message": "Address parameter and at least one of name or data_type are required"
},
"timestamp": int(time.time() * 1000)
}
payload = {
"address": address
}
if name:
payload["newName"] = name
if data_type:
payload["dataType"] = data_type
# Handle the cases separately for maximum reliability
if name and data_type is None:
# If only renaming, use the existing data endpoint that's already tested
name_payload = {"address": address, "newName": name}
response = safe_post(port, "data", name_payload)
return simplify_response(response)
if data_type and name is None:
# If only changing type, use the data/type endpoint
type_payload = {"address": address, "dataType": data_type}
response = safe_post(port, "data/type", type_payload)
return simplify_response(response)
# If both, handle sequentially (rename first, then type)
if name and data_type:
# First rename
name_payload = {"address": address, "newName": name}
rename_response = safe_post(port, "data", name_payload)
# Then set type
type_payload = {"address": address, "dataType": data_type}
type_response = safe_post(port, "data/type", type_payload)
# Return the most recent response which should include updated info
return simplify_response(type_response)
# This shouldn't be reached due to earlier checks
return {
"success": False,
"error": {
"code": "INVALID_REQUEST",
"message": "Neither name nor data_type specified"
},
"timestamp": int(time.time() * 1000)
}
@mcp.tool()
def set_data_type(port: int = DEFAULT_GHIDRA_PORT,
address: str = "",
data_type: str = "") -> dict:
"""Set the data type of a data item
Args:
port: Ghidra instance port (default: 8192)
address: Memory address in hex format
data_type: Data type name (e.g. "uint32_t", "char[10]")
Returns:
dict: Operation result with the updated data information
"""
if not address or not data_type:
return {
"success": False,
"error": {
"code": "MISSING_PARAMETER",
"message": "Address and data_type parameters are required"
},
"timestamp": int(time.time() * 1000)
}
# We'll implement a more direct approach first by creating the data directly
# First get info about the current data to use its name
try:
# Try to use the built-in data types - simplified approach
payload = {
"address": address,
"type": data_type
}
# This uses the create_data endpoint which has robust support
response = safe_post(port, "data", payload)
return simplify_response(response)
except Exception as e:
return {
"success": False,
"error": {
"code": "DATA_TYPE_ERROR",
"message": f"Failed to set data type: {str(e)}"
},
"timestamp": int(time.time() * 1000)
}
@mcp.tool() @mcp.tool()
def list_namespaces(port: int = DEFAULT_GHIDRA_PORT, def list_namespaces(port: int = DEFAULT_GHIDRA_PORT,
offset: int = 0, offset: int = 0,

View File

@ -45,6 +45,8 @@ package eu.starsong.ghidra.endpoints;
@Override @Override
public void registerEndpoints(HttpServer server) { public void registerEndpoints(HttpServer server) {
server.createContext("/data", this::handleData); server.createContext("/data", this::handleData);
server.createContext("/data/update", this::handleUpdateData);
server.createContext("/data/type", this::handleSetDataType);
} }
public void handleData(HttpExchange exchange) throws IOException { public void handleData(HttpExchange exchange) throws IOException {
@ -203,4 +205,262 @@ package eu.starsong.ghidra.endpoints;
} }
// parseIntOrDefault is inherited from AbstractEndpoint // parseIntOrDefault is inherited from AbstractEndpoint
public void handleSetDataType(HttpExchange exchange) throws IOException {
try {
if ("PATCH".equals(exchange.getRequestMethod()) || "POST".equals(exchange.getRequestMethod())) {
Map<String, String> params = parseJsonPostParams(exchange);
final String addressStr = params.get("address");
final String dataTypeStr = params.get("dataType");
if (addressStr == null || addressStr.isEmpty() || dataTypeStr == null || dataTypeStr.isEmpty()) {
sendErrorResponse(exchange, 400,
"Missing required parameters: address and dataType must be provided",
"MISSING_PARAMETERS");
return;
}
Program program = getCurrentProgram();
if (program == null) {
sendErrorResponse(exchange, 400, "No program loaded", "NO_PROGRAM_LOADED");
return;
}
try {
Map<String, Object> result = new HashMap<>();
result.put("address", addressStr);
result.put("dataType", dataTypeStr);
TransactionHelper.executeInTransaction(program, "Set Data Type", () -> {
// Get the data at the address
Address addr = program.getAddressFactory().getAddress(addressStr);
Listing listing = program.getListing();
Data existingData = listing.getDefinedDataAt(addr);
if (existingData == null) {
throw new Exception("No defined data found at address: " + addressStr);
}
// Try to find the data type in the data type manager
ghidra.program.model.data.DataType dataType = null;
// First try built-in types with path
dataType = program.getDataTypeManager().getDataType("/" + dataTypeStr);
// Try built-in types without path
if (dataType == null) {
dataType = program.getDataTypeManager().findDataType("/" + dataTypeStr);
}
// If still not found, try to parse it as a C-style declaration
if (dataType == null) {
try {
ghidra.app.util.parser.FunctionSignatureParser parser =
new ghidra.app.util.parser.FunctionSignatureParser(program.getDataTypeManager(), null);
dataType = parser.parse(null, dataTypeStr);
} catch (Exception e) {
Msg.debug(this, "Function signature parser failed: " + e.getMessage());
}
}
// Try C parser as a last resort
if (dataType == null) {
try {
// Use the DataTypeParser to create the type
ghidra.app.util.parser.FunctionSignatureParser parser =
new ghidra.app.util.parser.FunctionSignatureParser(program.getDataTypeManager(), null);
dataType = parser.parse(null, dataTypeStr);
} catch (Exception e) {
Msg.error(this, "Error parsing data type: " + dataTypeStr, e);
}
}
if (dataType == null) {
throw new Exception("Could not find or parse data type: " + dataTypeStr);
}
// Apply the data type
try {
Data newDataItem = listing.createData(addr, dataType);
if (newDataItem == null) {
// Try clearing existing data first and then creating it
listing.clearCodeUnits(addr, addr.add(existingData.getLength() - 1), false);
newDataItem = listing.createData(addr, dataType);
if (newDataItem == null) {
throw new Exception("Failed to apply data type " + dataTypeStr + " at " + addressStr);
}
}
} catch (Exception e) {
throw new Exception("Failed to apply data type " + dataTypeStr + " at " + addressStr, e);
}
// Re-get the data to return its current info
Data newData = listing.getDefinedDataAt(addr);
if (newData != null) {
result.put("currentDataType", newData.getDataType().getName());
result.put("length", newData.getLength());
result.put("value", newData.getDefaultValueRepresentation());
}
return null;
});
// Build HATEOAS response
eu.starsong.ghidra.api.ResponseBuilder builder = new eu.starsong.ghidra.api.ResponseBuilder(exchange, port)
.success(true)
.result(result);
// Add relevant links
builder.addLink("self", "/data/" + addressStr);
builder.addLink("data", "/data");
builder.addLink("program", "/program");
sendJsonResponse(exchange, builder.build(), 200);
} catch (TransactionException e) {
Msg.error(this, "Transaction failed: Set Data Type", e);
sendErrorResponse(exchange, 500, "Failed to set data type: " + e.getMessage(), "TRANSACTION_ERROR");
} catch (Exception e) {
Msg.error(this, "Error during set data type operation", e);
sendErrorResponse(exchange, 400, "Error setting data type: " + e.getMessage(), "INVALID_PARAMETER");
}
} else {
sendErrorResponse(exchange, 405, "Method Not Allowed", "METHOD_NOT_ALLOWED");
}
} catch (IOException e) {
Msg.error(this, "Error parsing request parameters for data type update", e);
sendErrorResponse(exchange, 400, "Invalid request body: " + e.getMessage(), "INVALID_REQUEST");
} catch (Exception e) {
Msg.error(this, "Unexpected error setting data type", e);
sendErrorResponse(exchange, 500, "Error setting data type: " + e.getMessage(), "INTERNAL_ERROR");
}
}
public void handleUpdateData(HttpExchange exchange) throws IOException {
try {
if ("PATCH".equals(exchange.getRequestMethod()) || "POST".equals(exchange.getRequestMethod())) {
Map<String, String> params = parseJsonPostParams(exchange);
final String addressStr = params.get("address");
final String newName = params.get("newName");
final String dataTypeStr = params.get("dataType");
// At least one of name or dataType must be provided
if (addressStr == null || addressStr.isEmpty() ||
(newName == null || newName.isEmpty()) && (dataTypeStr == null || dataTypeStr.isEmpty())) {
sendErrorResponse(exchange, 400,
"Missing required parameters: address and either name or dataType must be provided",
"MISSING_PARAMETERS");
return;
}
Program program = getCurrentProgram();
if (program == null) {
sendErrorResponse(exchange, 400, "No program loaded", "NO_PROGRAM_LOADED");
return;
}
try {
Map<String, Object> result = new HashMap<>();
result.put("address", addressStr);
TransactionHelper.executeInTransaction(program, "Update Data", () -> {
// Get the data at the address
Address addr = program.getAddressFactory().getAddress(addressStr);
Listing listing = program.getListing();
Data data = listing.getDefinedDataAt(addr);
if (data == null) {
throw new Exception("No defined data found at address: " + addressStr);
}
// Rename if name is provided
if (newName != null && !newName.isEmpty()) {
SymbolTable symTable = program.getSymbolTable();
Symbol symbol = symTable.getPrimarySymbol(addr);
if (symbol != null) {
symbol.setName(newName, SourceType.USER_DEFINED);
} else {
// Create a new label if no primary symbol exists
symTable.createLabel(addr, newName, SourceType.USER_DEFINED);
}
result.put("name", newName);
}
// Change data type if specified
if (dataTypeStr != null && !dataTypeStr.isEmpty()) {
// Try to find the data type in the data type manager
ghidra.program.model.data.DataType dataType = null;
// First try built-in types
dataType = program.getDataTypeManager().getDataType("/" + dataTypeStr);
// If not found, try to parse it as a C-style declaration
if (dataType == null) {
ghidra.app.util.parser.FunctionSignatureParser parser =
new ghidra.app.util.parser.FunctionSignatureParser(program.getDataTypeManager(), null);
try {
dataType = parser.parse(null, dataTypeStr);
} catch (Exception e) {
Msg.error(this, "Error parsing data type: " + dataTypeStr, e);
}
}
if (dataType == null) {
throw new Exception("Could not find or parse data type: " + dataTypeStr);
}
// Apply the data type
try {
Data newData = listing.createData(addr, dataType);
if (newData == null) {
throw new Exception("Failed to apply data type " + dataTypeStr + " at " + addressStr);
}
} catch (Exception e) {
throw new Exception("Failed to apply data type " + dataTypeStr + " at " + addressStr, e);
}
result.put("dataType", dataTypeStr);
// Re-get the data to return its current info
data = listing.getDefinedDataAt(addr);
}
// Add additional data info to result
if (data != null) {
result.put("currentDataType", data.getDataType().getName());
result.put("length", data.getLength());
result.put("value", data.getDefaultValueRepresentation());
}
return null;
});
// Build HATEOAS response
eu.starsong.ghidra.api.ResponseBuilder builder = new eu.starsong.ghidra.api.ResponseBuilder(exchange, port)
.success(true)
.result(result);
// Add relevant links
builder.addLink("self", "/data/" + addressStr);
builder.addLink("data", "/data");
builder.addLink("program", "/program");
sendJsonResponse(exchange, builder.build(), 200);
} catch (TransactionException e) {
Msg.error(this, "Transaction failed: Update Data", e);
sendErrorResponse(exchange, 500, "Failed to update data: " + e.getMessage(), "TRANSACTION_ERROR");
} catch (Exception e) {
Msg.error(this, "Error during update data operation", e);
sendErrorResponse(exchange, 400, "Error updating data: " + e.getMessage(), "INVALID_PARAMETER");
}
} else {
sendErrorResponse(exchange, 405, "Method Not Allowed", "METHOD_NOT_ALLOWED");
}
} catch (IOException e) {
Msg.error(this, "Error parsing request parameters for data update", e);
sendErrorResponse(exchange, 400, "Invalid request body: " + e.getMessage(), "INVALID_REQUEST");
} catch (Exception e) {
Msg.error(this, "Unexpected error updating data", e);
sendErrorResponse(exchange, 500, "Error updating data: " + e.getMessage(), "INTERNAL_ERROR");
}
}
} }

127
test_data_operations.py Executable file
View File

@ -0,0 +1,127 @@
#!/usr/bin/env python3
"""
Test script for data operations in GhydraMCP bridge.
This script tests renaming and changing data types.
"""
import json
import logging
import sys
import time
from urllib.parse import quote
import anyio
from mcp.client.session import ClientSession
from mcp.client.stdio import StdioServerParameters, stdio_client
# Setup logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("data_test")
async def test_data_operations():
"""Test data operations using the MCP client"""
# Configure the server parameters
server_parameters = StdioServerParameters(
command=sys.executable,
args=["bridge_mcp_hydra.py"],
)
# Connect to the bridge
logger.info("Connecting to bridge...")
async with stdio_client(server_parameters) as (read_stream, write_stream):
# Create a session
logger.info("Creating session...")
async with ClientSession(read_stream, write_stream) as session:
# Initialize the session
logger.info("Initializing session...")
init_result = await session.initialize()
logger.info(f"Initialization result: {init_result}")
# List data to find a data item to test with
logger.info("Listing data...")
list_data_result = await session.call_tool(
"list_data_items",
arguments={"port": 8192, "limit": 5}
)
list_data_data = json.loads(list_data_result.content[0].text)
logger.info(f"List data result: {list_data_data}")
if "result" not in list_data_data or not list_data_data.get("result"):
logger.error("No data items found - cannot proceed with test")
return
# Get the first data item for testing
data_item = list_data_data["result"][0]
data_address = data_item.get("address")
original_name = data_item.get("label")
if not data_address:
logger.error("No address found in data item - cannot proceed with test")
return
logger.info(f"Testing with data at address {data_address}, original name: {original_name}")
# Test renaming the data
test_name = f"TEST_DATA_{int(time.time())}"
logger.info(f"Renaming data to {test_name}")
rename_result = await session.call_tool(
"update_data",
arguments={"port": 8192, "address": data_address, "name": test_name}
)
rename_data = json.loads(rename_result.content[0].text)
logger.info(f"Rename result: {rename_data}")
if not rename_data.get("success", False):
logger.error(f"Failed to rename data: {rename_data.get('error', {}).get('message', 'Unknown error')}")
else:
logger.info("Data renamed successfully")
# Test changing the data type
test_type = "uint32_t *" # Pointer to uint32_t - adjust as needed for your test data
logger.info(f"Changing data type to {test_type}")
type_result = await session.call_tool(
"update_data",
arguments={"port": 8192, "address": data_address, "data_type": test_type}
)
type_data = json.loads(type_result.content[0].text)
logger.info(f"Change type result: {type_data}")
if not type_data.get("success", False):
logger.error(f"Failed to change data type: {type_data.get('error', {}).get('message', 'Unknown error')}")
else:
logger.info("Data type changed successfully")
# Test both operations together
logger.info(f"Restoring original name and trying different type")
combined_result = await session.call_tool(
"update_data",
arguments={
"port": 8192,
"address": data_address,
"name": original_name,
"data_type": "uint32_t"
}
)
combined_data = json.loads(combined_result.content[0].text)
logger.info(f"Combined update result: {combined_data}")
if not combined_data.get("success", False):
logger.error(f"Failed to perform combined update: {combined_data.get('error', {}).get('message', 'Unknown error')}")
else:
logger.info("Combined update successful")
def main():
"""Main entry point"""
try:
anyio.run(test_data_operations)
except Exception as e:
logger.error(f"Error: {e}")
sys.exit(1)
if __name__ == "__main__":
main()

54
test_data_simple.py Executable file
View File

@ -0,0 +1,54 @@
#!/usr/bin/env python3
"""
Direct test for data operations.
"""
import json
import logging
import sys
import requests
# Setup logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("simple_test")
def test_create_data():
address = "08000000"
# Try data types
types_to_try = ["uint32_t", "int", "dword", "byte", "pointer"]
for data_type in types_to_try:
logger.info(f"Testing data type: {data_type}")
url = f"http://localhost:8192/data"
payload = {
"address": address,
"type": data_type,
"newName": f"TEST_{data_type.upper()}" # Include a name for the data
}
try:
response = requests.post(url, json=payload)
logger.info(f"Status: {response.status_code}")
logger.info(f"Response: {response.text}")
if response.status_code == 200:
logger.info(f"Success with data type {data_type}")
return True
except Exception as e:
logger.error(f"Error: {e}")
return False
def main():
try:
result = test_create_data()
if result:
logger.info("Test successful!")
else:
logger.error("All test data types failed")
except Exception as e:
logger.error(f"Unexpected error: {e}")
sys.exit(1)
if __name__ == "__main__":
main()

105
test_data_type.py Executable file
View File

@ -0,0 +1,105 @@
#!/usr/bin/env python3
"""
Test script for setting data types in GhydraMCP bridge.
"""
import json
import logging
import sys
import time
from urllib.parse import quote
import anyio
from mcp.client.session import ClientSession
from mcp.client.stdio import StdioServerParameters, stdio_client
# Setup logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("data_type_test")
async def test_set_data_type():
"""Test the set_data_type tool"""
# Configure the server parameters
server_parameters = StdioServerParameters(
command=sys.executable,
args=["bridge_mcp_hydra.py"],
)
# Connect to the bridge
logger.info("Connecting to bridge...")
async with stdio_client(server_parameters) as (read_stream, write_stream):
# Create a session
logger.info("Creating session...")
async with ClientSession(read_stream, write_stream) as session:
# Initialize the session
logger.info("Initializing session...")
init_result = await session.initialize()
logger.info(f"Initialization result: {init_result}")
# List tools to make sure our new tool is available
logger.info("Listing tools...")
tools_result = await session.list_tools()
tool_data = json.loads(tools_result.content[0].text) if tools_result.content else None
tools = tool_data.get("tools", []) if tool_data else []
tool_names = [t.get("name") for t in tools]
logger.info(f"Available tools: {tool_names}")
if "set_data_type" not in tool_names:
logger.error("set_data_type tool not found!")
return
# List data to find a data item to test with
logger.info("Listing data...")
list_data_result = await session.call_tool(
"list_data_items",
arguments={"port": 8192, "limit": 5}
)
list_data_data = json.loads(list_data_result.content[0].text)
if "result" not in list_data_data or not list_data_data.get("result"):
logger.error("No data items found - cannot proceed with test")
return
# Get the first data item for testing
data_item = list_data_data["result"][0]
data_address = data_item.get("address")
original_type = data_item.get("dataType")
if not data_address:
logger.error("No address found in data item - cannot proceed with test")
return
logger.info(f"Testing with data at address {data_address}, original type: {original_type}")
# Test with simple types first
simple_tests = ["uint32_t", "int", "byte", "word", "dword"]
for test_type in simple_tests:
logger.info(f"Testing type: {test_type}")
set_type_result = await session.call_tool(
"set_data_type",
arguments={"port": 8192, "address": data_address, "data_type": test_type}
)
try:
set_type_data = json.loads(set_type_result.content[0].text)
logger.info(f"Result: {set_type_data}")
if set_type_data.get("success", False):
logger.info(f"Successfully set type to {test_type}")
break
else:
logger.warning(f"Failed to set type to {test_type}: {set_type_data.get('error', {}).get('message', 'Unknown error')}")
except Exception as e:
logger.error(f"Error processing result: {e}")
def main():
"""Main entry point"""
try:
anyio.run(test_set_data_type)
except Exception as e:
logger.error(f"Error: {e}")
sys.exit(1)
if __name__ == "__main__":
main()