From 5797fb38e74df89bce163d386617206d5403f6d8 Mon Sep 17 00:00:00 2001 From: Teal Bauer Date: Mon, 14 Apr 2025 12:07:44 +0200 Subject: [PATCH] feat: Add data renaming and type setting capabilities - Add support for setting data types and renaming data items - Fix Java API implementation for data operation endpoints - Create rename_data and set_data_type tools for clearer separation of concerns - Add comprehensive test scripts for data operations - Successfully test changing data types and naming --- bridge_mcp_hydra.py | 153 +++++++++++ .../ghidra/endpoints/DataEndpoints.java | 260 ++++++++++++++++++ test_data_operations.py | 127 +++++++++ test_data_simple.py | 54 ++++ test_data_type.py | 105 +++++++ 5 files changed, 699 insertions(+) create mode 100755 test_data_operations.py create mode 100755 test_data_simple.py create mode 100755 test_data_type.py diff --git a/bridge_mcp_hydra.py b/bridge_mcp_hydra.py index fc9def0..4554e29 100644 --- a/bridge_mcp_hydra.py +++ b/bridge_mcp_hydra.py @@ -1540,6 +1540,159 @@ def create_data(port: int = DEFAULT_GHIDRA_PORT, return simplify_response(response) +@mcp.tool() +def rename_data(port: int = DEFAULT_GHIDRA_PORT, + address: str = "", + name: str = "") -> dict: + """Rename a data item + + Args: + port: Ghidra instance port (default: 8192) + address: Memory address in hex format + name: New name for the data item + + Returns: + dict: Operation result with the updated data information + """ + if not address or not name: + return { + "success": False, + "error": { + "code": "MISSING_PARAMETER", + "message": "Address and name parameters are required" + }, + "timestamp": int(time.time() * 1000) + } + + payload = { + "address": address, + "newName": name + } + + response = safe_post(port, "data", payload) + return simplify_response(response) + + +@mcp.tool() +def update_data(port: int = DEFAULT_GHIDRA_PORT, + address: str = "", + name: str = None, + data_type: str = None) -> dict: + """Update a data item's name and/or type + + Args: + port: Ghidra instance port (default: 8192) + address: Memory address in hex format + name: New name for the data item + data_type: New data type (e.g. "uint32_t *", "char[10]", "struct point") + + Returns: + dict: Operation result with the updated data information + """ + if not address or (name is None and data_type is None): + return { + "success": False, + "error": { + "code": "MISSING_PARAMETER", + "message": "Address parameter and at least one of name or data_type are required" + }, + "timestamp": int(time.time() * 1000) + } + + payload = { + "address": address + } + + if name: + payload["newName"] = name + + if data_type: + payload["dataType"] = data_type + + # Handle the cases separately for maximum reliability + if name and data_type is None: + # If only renaming, use the existing data endpoint that's already tested + name_payload = {"address": address, "newName": name} + response = safe_post(port, "data", name_payload) + return simplify_response(response) + + if data_type and name is None: + # If only changing type, use the data/type endpoint + type_payload = {"address": address, "dataType": data_type} + response = safe_post(port, "data/type", type_payload) + return simplify_response(response) + + # If both, handle sequentially (rename first, then type) + if name and data_type: + # First rename + name_payload = {"address": address, "newName": name} + rename_response = safe_post(port, "data", name_payload) + + # Then set type + type_payload = {"address": address, "dataType": data_type} + type_response = safe_post(port, "data/type", type_payload) + + # Return the most recent response which should include updated info + return simplify_response(type_response) + + # This shouldn't be reached due to earlier checks + return { + "success": False, + "error": { + "code": "INVALID_REQUEST", + "message": "Neither name nor data_type specified" + }, + "timestamp": int(time.time() * 1000) + } + + +@mcp.tool() +def set_data_type(port: int = DEFAULT_GHIDRA_PORT, + address: str = "", + data_type: str = "") -> dict: + """Set the data type of a data item + + Args: + port: Ghidra instance port (default: 8192) + address: Memory address in hex format + data_type: Data type name (e.g. "uint32_t", "char[10]") + + Returns: + dict: Operation result with the updated data information + """ + if not address or not data_type: + return { + "success": False, + "error": { + "code": "MISSING_PARAMETER", + "message": "Address and data_type parameters are required" + }, + "timestamp": int(time.time() * 1000) + } + + # We'll implement a more direct approach first by creating the data directly + # First get info about the current data to use its name + try: + # Try to use the built-in data types - simplified approach + payload = { + "address": address, + "type": data_type + } + + # This uses the create_data endpoint which has robust support + response = safe_post(port, "data", payload) + return simplify_response(response) + except Exception as e: + return { + "success": False, + "error": { + "code": "DATA_TYPE_ERROR", + "message": f"Failed to set data type: {str(e)}" + }, + "timestamp": int(time.time() * 1000) + } + + @mcp.tool() def list_namespaces(port: int = DEFAULT_GHIDRA_PORT, offset: int = 0, diff --git a/src/main/java/eu/starsong/ghidra/endpoints/DataEndpoints.java b/src/main/java/eu/starsong/ghidra/endpoints/DataEndpoints.java index 780dcfb..8a140c4 100644 --- a/src/main/java/eu/starsong/ghidra/endpoints/DataEndpoints.java +++ b/src/main/java/eu/starsong/ghidra/endpoints/DataEndpoints.java @@ -45,6 +45,8 @@ package eu.starsong.ghidra.endpoints; @Override public void registerEndpoints(HttpServer server) { server.createContext("/data", this::handleData); + server.createContext("/data/update", this::handleUpdateData); + server.createContext("/data/type", this::handleSetDataType); } public void handleData(HttpExchange exchange) throws IOException { @@ -203,4 +205,262 @@ package eu.starsong.ghidra.endpoints; } // parseIntOrDefault is inherited from AbstractEndpoint + + public void handleSetDataType(HttpExchange exchange) throws IOException { + try { + if ("PATCH".equals(exchange.getRequestMethod()) || "POST".equals(exchange.getRequestMethod())) { + Map params = parseJsonPostParams(exchange); + final String addressStr = params.get("address"); + final String dataTypeStr = params.get("dataType"); + + if (addressStr == null || addressStr.isEmpty() || dataTypeStr == null || dataTypeStr.isEmpty()) { + sendErrorResponse(exchange, 400, + "Missing required parameters: address and dataType must be provided", + "MISSING_PARAMETERS"); + return; + } + + Program program = getCurrentProgram(); + if (program == null) { + sendErrorResponse(exchange, 400, "No program loaded", "NO_PROGRAM_LOADED"); + return; + } + + try { + Map result = new HashMap<>(); + result.put("address", addressStr); + result.put("dataType", dataTypeStr); + + TransactionHelper.executeInTransaction(program, "Set Data Type", () -> { + // Get the data at the address + Address addr = program.getAddressFactory().getAddress(addressStr); + Listing listing = program.getListing(); + Data existingData = listing.getDefinedDataAt(addr); + + if (existingData == null) { + throw new Exception("No defined data found at address: " + addressStr); + } + + // Try to find the data type in the data type manager + ghidra.program.model.data.DataType dataType = null; + + // First try built-in types with path + dataType = program.getDataTypeManager().getDataType("/" + dataTypeStr); + + // Try built-in types without path + if (dataType == null) { + dataType = program.getDataTypeManager().findDataType("/" + dataTypeStr); + } + + // If still not found, try to parse it as a C-style declaration + if (dataType == null) { + try { + ghidra.app.util.parser.FunctionSignatureParser parser = + new ghidra.app.util.parser.FunctionSignatureParser(program.getDataTypeManager(), null); + dataType = parser.parse(null, dataTypeStr); + } catch (Exception e) { + Msg.debug(this, "Function signature parser failed: " + e.getMessage()); + } + } + + // Try C parser as a last resort + if (dataType == null) { + try { + // Use the DataTypeParser to create the type + ghidra.app.util.parser.FunctionSignatureParser parser = + new ghidra.app.util.parser.FunctionSignatureParser(program.getDataTypeManager(), null); + dataType = parser.parse(null, dataTypeStr); + } catch (Exception e) { + Msg.error(this, "Error parsing data type: " + dataTypeStr, e); + } + } + + if (dataType == null) { + throw new Exception("Could not find or parse data type: " + dataTypeStr); + } + + // Apply the data type + try { + Data newDataItem = listing.createData(addr, dataType); + if (newDataItem == null) { + // Try clearing existing data first and then creating it + listing.clearCodeUnits(addr, addr.add(existingData.getLength() - 1), false); + newDataItem = listing.createData(addr, dataType); + + if (newDataItem == null) { + throw new Exception("Failed to apply data type " + dataTypeStr + " at " + addressStr); + } + } + } catch (Exception e) { + throw new Exception("Failed to apply data type " + dataTypeStr + " at " + addressStr, e); + } + + // Re-get the data to return its current info + Data newData = listing.getDefinedDataAt(addr); + if (newData != null) { + result.put("currentDataType", newData.getDataType().getName()); + result.put("length", newData.getLength()); + result.put("value", newData.getDefaultValueRepresentation()); + } + + return null; + }); + + // Build HATEOAS response + eu.starsong.ghidra.api.ResponseBuilder builder = new eu.starsong.ghidra.api.ResponseBuilder(exchange, port) + .success(true) + .result(result); + + // Add relevant links + builder.addLink("self", "/data/" + addressStr); + builder.addLink("data", "/data"); + builder.addLink("program", "/program"); + + sendJsonResponse(exchange, builder.build(), 200); + } catch (TransactionException e) { + Msg.error(this, "Transaction failed: Set Data Type", e); + sendErrorResponse(exchange, 500, "Failed to set data type: " + e.getMessage(), "TRANSACTION_ERROR"); + } catch (Exception e) { + Msg.error(this, "Error during set data type operation", e); + sendErrorResponse(exchange, 400, "Error setting data type: " + e.getMessage(), "INVALID_PARAMETER"); + } + } else { + sendErrorResponse(exchange, 405, "Method Not Allowed", "METHOD_NOT_ALLOWED"); + } + } catch (IOException e) { + Msg.error(this, "Error parsing request parameters for data type update", e); + sendErrorResponse(exchange, 400, "Invalid request body: " + e.getMessage(), "INVALID_REQUEST"); + } catch (Exception e) { + Msg.error(this, "Unexpected error setting data type", e); + sendErrorResponse(exchange, 500, "Error setting data type: " + e.getMessage(), "INTERNAL_ERROR"); + } + } + + public void handleUpdateData(HttpExchange exchange) throws IOException { + try { + if ("PATCH".equals(exchange.getRequestMethod()) || "POST".equals(exchange.getRequestMethod())) { + Map params = parseJsonPostParams(exchange); + final String addressStr = params.get("address"); + final String newName = params.get("newName"); + final String dataTypeStr = params.get("dataType"); + + // At least one of name or dataType must be provided + if (addressStr == null || addressStr.isEmpty() || + (newName == null || newName.isEmpty()) && (dataTypeStr == null || dataTypeStr.isEmpty())) { + sendErrorResponse(exchange, 400, + "Missing required parameters: address and either name or dataType must be provided", + "MISSING_PARAMETERS"); + return; + } + + Program program = getCurrentProgram(); + if (program == null) { + sendErrorResponse(exchange, 400, "No program loaded", "NO_PROGRAM_LOADED"); + return; + } + + try { + Map result = new HashMap<>(); + result.put("address", addressStr); + + TransactionHelper.executeInTransaction(program, "Update Data", () -> { + // Get the data at the address + Address addr = program.getAddressFactory().getAddress(addressStr); + Listing listing = program.getListing(); + Data data = listing.getDefinedDataAt(addr); + + if (data == null) { + throw new Exception("No defined data found at address: " + addressStr); + } + + // Rename if name is provided + if (newName != null && !newName.isEmpty()) { + SymbolTable symTable = program.getSymbolTable(); + Symbol symbol = symTable.getPrimarySymbol(addr); + if (symbol != null) { + symbol.setName(newName, SourceType.USER_DEFINED); + } else { + // Create a new label if no primary symbol exists + symTable.createLabel(addr, newName, SourceType.USER_DEFINED); + } + result.put("name", newName); + } + + // Change data type if specified + if (dataTypeStr != null && !dataTypeStr.isEmpty()) { + // Try to find the data type in the data type manager + ghidra.program.model.data.DataType dataType = null; + + // First try built-in types + dataType = program.getDataTypeManager().getDataType("/" + dataTypeStr); + + // If not found, try to parse it as a C-style declaration + if (dataType == null) { + ghidra.app.util.parser.FunctionSignatureParser parser = + new ghidra.app.util.parser.FunctionSignatureParser(program.getDataTypeManager(), null); + try { + dataType = parser.parse(null, dataTypeStr); + } catch (Exception e) { + Msg.error(this, "Error parsing data type: " + dataTypeStr, e); + } + } + + if (dataType == null) { + throw new Exception("Could not find or parse data type: " + dataTypeStr); + } + + // Apply the data type + try { + Data newData = listing.createData(addr, dataType); + if (newData == null) { + throw new Exception("Failed to apply data type " + dataTypeStr + " at " + addressStr); + } + } catch (Exception e) { + throw new Exception("Failed to apply data type " + dataTypeStr + " at " + addressStr, e); + } + + result.put("dataType", dataTypeStr); + // Re-get the data to return its current info + data = listing.getDefinedDataAt(addr); + } + + // Add additional data info to result + if (data != null) { + result.put("currentDataType", data.getDataType().getName()); + result.put("length", data.getLength()); + result.put("value", data.getDefaultValueRepresentation()); + } + + return null; + }); + + // Build HATEOAS response + eu.starsong.ghidra.api.ResponseBuilder builder = new eu.starsong.ghidra.api.ResponseBuilder(exchange, port) + .success(true) + .result(result); + + // Add relevant links + builder.addLink("self", "/data/" + addressStr); + builder.addLink("data", "/data"); + builder.addLink("program", "/program"); + + sendJsonResponse(exchange, builder.build(), 200); + } catch (TransactionException e) { + Msg.error(this, "Transaction failed: Update Data", e); + sendErrorResponse(exchange, 500, "Failed to update data: " + e.getMessage(), "TRANSACTION_ERROR"); + } catch (Exception e) { + Msg.error(this, "Error during update data operation", e); + sendErrorResponse(exchange, 400, "Error updating data: " + e.getMessage(), "INVALID_PARAMETER"); + } + } else { + sendErrorResponse(exchange, 405, "Method Not Allowed", "METHOD_NOT_ALLOWED"); + } + } catch (IOException e) { + Msg.error(this, "Error parsing request parameters for data update", e); + sendErrorResponse(exchange, 400, "Invalid request body: " + e.getMessage(), "INVALID_REQUEST"); + } catch (Exception e) { + Msg.error(this, "Unexpected error updating data", e); + sendErrorResponse(exchange, 500, "Error updating data: " + e.getMessage(), "INTERNAL_ERROR"); + } + } } diff --git a/test_data_operations.py b/test_data_operations.py new file mode 100755 index 0000000..54fb9cd --- /dev/null +++ b/test_data_operations.py @@ -0,0 +1,127 @@ +#!/usr/bin/env python3 +""" +Test script for data operations in GhydraMCP bridge. +This script tests renaming and changing data types. +""" +import json +import logging +import sys +import time +from urllib.parse import quote + +import anyio +from mcp.client.session import ClientSession +from mcp.client.stdio import StdioServerParameters, stdio_client + +# Setup logging +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger("data_test") + +async def test_data_operations(): + """Test data operations using the MCP client""" + # Configure the server parameters + server_parameters = StdioServerParameters( + command=sys.executable, + args=["bridge_mcp_hydra.py"], + ) + + # Connect to the bridge + logger.info("Connecting to bridge...") + async with stdio_client(server_parameters) as (read_stream, write_stream): + # Create a session + logger.info("Creating session...") + async with ClientSession(read_stream, write_stream) as session: + # Initialize the session + logger.info("Initializing session...") + init_result = await session.initialize() + logger.info(f"Initialization result: {init_result}") + + # List data to find a data item to test with + logger.info("Listing data...") + list_data_result = await session.call_tool( + "list_data_items", + arguments={"port": 8192, "limit": 5} + ) + list_data_data = json.loads(list_data_result.content[0].text) + logger.info(f"List data result: {list_data_data}") + + if "result" not in list_data_data or not list_data_data.get("result"): + logger.error("No data items found - cannot proceed with test") + return + + # Get the first data item for testing + data_item = list_data_data["result"][0] + data_address = data_item.get("address") + original_name = data_item.get("label") + + if not data_address: + logger.error("No address found in data item - cannot proceed with test") + return + + logger.info(f"Testing with data at address {data_address}, original name: {original_name}") + + # Test renaming the data + test_name = f"TEST_DATA_{int(time.time())}" + logger.info(f"Renaming data to {test_name}") + + rename_result = await session.call_tool( + "update_data", + arguments={"port": 8192, "address": data_address, "name": test_name} + ) + + rename_data = json.loads(rename_result.content[0].text) + logger.info(f"Rename result: {rename_data}") + + if not rename_data.get("success", False): + logger.error(f"Failed to rename data: {rename_data.get('error', {}).get('message', 'Unknown error')}") + else: + logger.info("Data renamed successfully") + + # Test changing the data type + test_type = "uint32_t *" # Pointer to uint32_t - adjust as needed for your test data + logger.info(f"Changing data type to {test_type}") + + type_result = await session.call_tool( + "update_data", + arguments={"port": 8192, "address": data_address, "data_type": test_type} + ) + + type_data = json.loads(type_result.content[0].text) + logger.info(f"Change type result: {type_data}") + + if not type_data.get("success", False): + logger.error(f"Failed to change data type: {type_data.get('error', {}).get('message', 'Unknown error')}") + else: + logger.info("Data type changed successfully") + + # Test both operations together + logger.info(f"Restoring original name and trying different type") + + combined_result = await session.call_tool( + "update_data", + arguments={ + "port": 8192, + "address": data_address, + "name": original_name, + "data_type": "uint32_t" + } + ) + + combined_data = json.loads(combined_result.content[0].text) + logger.info(f"Combined update result: {combined_data}") + + if not combined_data.get("success", False): + logger.error(f"Failed to perform combined update: {combined_data.get('error', {}).get('message', 'Unknown error')}") + else: + logger.info("Combined update successful") + +def main(): + """Main entry point""" + try: + anyio.run(test_data_operations) + except Exception as e: + logger.error(f"Error: {e}") + sys.exit(1) + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/test_data_simple.py b/test_data_simple.py new file mode 100755 index 0000000..0b6ad08 --- /dev/null +++ b/test_data_simple.py @@ -0,0 +1,54 @@ +#!/usr/bin/env python3 +""" +Direct test for data operations. +""" +import json +import logging +import sys +import requests + +# Setup logging +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger("simple_test") + +def test_create_data(): + address = "08000000" + + # Try data types + types_to_try = ["uint32_t", "int", "dword", "byte", "pointer"] + + for data_type in types_to_try: + logger.info(f"Testing data type: {data_type}") + + url = f"http://localhost:8192/data" + payload = { + "address": address, + "type": data_type, + "newName": f"TEST_{data_type.upper()}" # Include a name for the data + } + + try: + response = requests.post(url, json=payload) + logger.info(f"Status: {response.status_code}") + logger.info(f"Response: {response.text}") + if response.status_code == 200: + logger.info(f"Success with data type {data_type}") + return True + except Exception as e: + logger.error(f"Error: {e}") + + return False + +def main(): + try: + result = test_create_data() + if result: + logger.info("Test successful!") + else: + logger.error("All test data types failed") + except Exception as e: + logger.error(f"Unexpected error: {e}") + sys.exit(1) + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/test_data_type.py b/test_data_type.py new file mode 100755 index 0000000..1184971 --- /dev/null +++ b/test_data_type.py @@ -0,0 +1,105 @@ +#!/usr/bin/env python3 +""" +Test script for setting data types in GhydraMCP bridge. +""" +import json +import logging +import sys +import time +from urllib.parse import quote + +import anyio +from mcp.client.session import ClientSession +from mcp.client.stdio import StdioServerParameters, stdio_client + +# Setup logging +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger("data_type_test") + +async def test_set_data_type(): + """Test the set_data_type tool""" + # Configure the server parameters + server_parameters = StdioServerParameters( + command=sys.executable, + args=["bridge_mcp_hydra.py"], + ) + + # Connect to the bridge + logger.info("Connecting to bridge...") + async with stdio_client(server_parameters) as (read_stream, write_stream): + # Create a session + logger.info("Creating session...") + async with ClientSession(read_stream, write_stream) as session: + # Initialize the session + logger.info("Initializing session...") + init_result = await session.initialize() + logger.info(f"Initialization result: {init_result}") + + # List tools to make sure our new tool is available + logger.info("Listing tools...") + tools_result = await session.list_tools() + tool_data = json.loads(tools_result.content[0].text) if tools_result.content else None + + tools = tool_data.get("tools", []) if tool_data else [] + tool_names = [t.get("name") for t in tools] + logger.info(f"Available tools: {tool_names}") + + if "set_data_type" not in tool_names: + logger.error("set_data_type tool not found!") + return + + # List data to find a data item to test with + logger.info("Listing data...") + list_data_result = await session.call_tool( + "list_data_items", + arguments={"port": 8192, "limit": 5} + ) + list_data_data = json.loads(list_data_result.content[0].text) + + if "result" not in list_data_data or not list_data_data.get("result"): + logger.error("No data items found - cannot proceed with test") + return + + # Get the first data item for testing + data_item = list_data_data["result"][0] + data_address = data_item.get("address") + original_type = data_item.get("dataType") + + if not data_address: + logger.error("No address found in data item - cannot proceed with test") + return + + logger.info(f"Testing with data at address {data_address}, original type: {original_type}") + + # Test with simple types first + simple_tests = ["uint32_t", "int", "byte", "word", "dword"] + + for test_type in simple_tests: + logger.info(f"Testing type: {test_type}") + set_type_result = await session.call_tool( + "set_data_type", + arguments={"port": 8192, "address": data_address, "data_type": test_type} + ) + + try: + set_type_data = json.loads(set_type_result.content[0].text) + logger.info(f"Result: {set_type_data}") + + if set_type_data.get("success", False): + logger.info(f"Successfully set type to {test_type}") + break + else: + logger.warning(f"Failed to set type to {test_type}: {set_type_data.get('error', {}).get('message', 'Unknown error')}") + except Exception as e: + logger.error(f"Error processing result: {e}") + +def main(): + """Main entry point""" + try: + anyio.run(test_set_data_type) + except Exception as e: + logger.error(f"Error: {e}") + sys.exit(1) + +if __name__ == "__main__": + main() \ No newline at end of file