diff --git a/bridge_mcp_hydra.py b/bridge_mcp_hydra.py index 555cae1..5e20377 100644 --- a/bridge_mcp_hydra.py +++ b/bridge_mcp_hydra.py @@ -580,7 +580,7 @@ def handle_sigint(signum, frame): # Resources provide information that can be loaded directly into context # They focus on data and minimize metadata -@mcp.resource() +@mcp.resource(uri="/instance/{port}") def ghidra_instance(port: int = None) -> dict: """Get detailed information about a Ghidra instance and the loaded program @@ -620,20 +620,19 @@ def ghidra_instance(port: int = None) -> dict: return instance_info -@mcp.resource() -def decompiled_function(name: str = None, address: str = None, port: int = None) -> str: - """Get decompiled C code for a function +@mcp.resource(uri="/instance/{port}/function/decompile/address/{address}") +def decompiled_function_by_address(port: int = None, address: str = None) -> str: + """Get decompiled C code for a function by address Args: - name: Function name (mutually exclusive with address) - address: Function address in hex format (mutually exclusive with address) - port: Specific Ghidra instance port (optional) + port: Specific Ghidra instance port + address: Function address in hex format Returns: str: The decompiled C code as a string, or error message """ - if not name and not address: - return "Error: Either name or address parameter is required" + if not address: + return "Error: Address parameter is required" port = _get_instance_port(port) @@ -642,10 +641,7 @@ def decompiled_function(name: str = None, address: str = None, port: int = None) "style": "normalize" } - if address: - endpoint = f"functions/{address}/decompile" - else: - endpoint = f"functions/by-name/{quote(name)}/decompile" + endpoint = f"functions/{address}/decompile" response = safe_get(port, endpoint, params) simplified = simplify_response(response) @@ -673,27 +669,72 @@ def decompiled_function(name: str = None, address: str = None, port: int = None) return "Error: Could not extract decompiled code from response" -@mcp.resource() -def function_info(name: str = None, address: str = None, port: int = None) -> dict: - """Get detailed information about a function +@mcp.resource(uri="/instance/{port}/function/decompile/name/{name}") +def decompiled_function_by_name(port: int = None, name: str = None) -> str: + """Get decompiled C code for a function by name Args: - name: Function name (mutually exclusive with address) - address: Function address in hex format (mutually exclusive with address) - port: Specific Ghidra instance port (optional) + port: Specific Ghidra instance port + name: Function name + + Returns: + str: The decompiled C code as a string, or error message + """ + if not name: + return "Error: Name parameter is required" + + port = _get_instance_port(port) + + params = { + "syntax_tree": "false", + "style": "normalize" + } + + endpoint = f"functions/by-name/{quote(name)}/decompile" + + response = safe_get(port, endpoint, params) + simplified = simplify_response(response) + + # For a resource, we want to directly return just the decompiled code + if (not isinstance(simplified, dict) or + not simplified.get("success", False) or + "result" not in simplified): + error_message = "Error: Could not decompile function" + if isinstance(simplified, dict) and "error" in simplified: + if isinstance(simplified["error"], dict): + error_message = simplified["error"].get("message", error_message) + else: + error_message = str(simplified["error"]) + return error_message + + # Extract just the decompiled code text + result = simplified["result"] + + # Different endpoints may return the code in different fields, try all of them + if isinstance(result, dict): + for key in ["decompiled_text", "ccode", "decompiled"]: + if key in result: + return result[key] + + return "Error: Could not extract decompiled code from response" + +@mcp.resource(uri="/instance/{port}/function/info/address/{address}") +def function_info_by_address(port: int = None, address: str = None) -> dict: + """Get detailed information about a function by address + + Args: + port: Specific Ghidra instance port + address: Function address in hex format Returns: dict: Complete function information including signature, parameters, etc. """ - if not name and not address: - return {"error": "Either name or address parameter is required"} + if not address: + return {"error": "Address parameter is required"} port = _get_instance_port(port) - if address: - endpoint = f"functions/{address}" - else: - endpoint = f"functions/by-name/{quote(name)}" + endpoint = f"functions/{address}" response = safe_get(port, endpoint) simplified = simplify_response(response) @@ -709,27 +750,115 @@ def function_info(name: str = None, address: str = None, port: int = None) -> di # Return just the function data without API metadata return simplified["result"] -@mcp.resource() -def disassembly(name: str = None, address: str = None, port: int = None) -> str: - """Get disassembled instructions for a function +@mcp.resource(uri="/instance/{port}/function/info/name/{name}") +def function_info_by_name(port: int = None, name: str = None) -> dict: + """Get detailed information about a function by name Args: - name: Function name (mutually exclusive with address) - address: Function address in hex format (mutually exclusive with address) - port: Specific Ghidra instance port (optional) + port: Specific Ghidra instance port + name: Function name + + Returns: + dict: Complete function information including signature, parameters, etc. + """ + if not name: + return {"error": "Name parameter is required"} + + port = _get_instance_port(port) + + endpoint = f"functions/by-name/{quote(name)}" + + response = safe_get(port, endpoint) + simplified = simplify_response(response) + + if (not isinstance(simplified, dict) or + not simplified.get("success", False) or + "result" not in simplified): + error = {"error": "Could not get function information"} + if isinstance(simplified, dict) and "error" in simplified: + error["error_details"] = simplified["error"] + return error + + # Return just the function data without API metadata + return simplified["result"] + +@mcp.resource(uri="/instance/{port}/function/disassembly/address/{address}") +def disassembly_by_address(port: int = None, address: str = None) -> str: + """Get disassembled instructions for a function by address + + Args: + port: Specific Ghidra instance port + address: Function address in hex format Returns: str: Formatted disassembly listing as a string """ - if not name and not address: - return "Error: Either name or address parameter is required" + if not address: + return "Error: Address parameter is required" port = _get_instance_port(port) - if address: - endpoint = f"functions/{address}/disassembly" - else: - endpoint = f"functions/by-name/{quote(name)}/disassembly" + endpoint = f"functions/{address}/disassembly" + + response = safe_get(port, endpoint) + simplified = simplify_response(response) + + if (not isinstance(simplified, dict) or + not simplified.get("success", False) or + "result" not in simplified): + error_message = "Error: Could not get disassembly" + if isinstance(simplified, dict) and "error" in simplified: + if isinstance(simplified["error"], dict): + error_message = simplified["error"].get("message", error_message) + else: + error_message = str(simplified["error"]) + return error_message + + # For a resource, we want to directly return just the disassembly text + result = simplified["result"] + + # Check if we have a disassembly_text field already + if isinstance(result, dict) and "disassembly_text" in result: + return result["disassembly_text"] + + # Otherwise if we have raw instructions, format them ourselves + if isinstance(result, dict) and "instructions" in result and isinstance(result["instructions"], list): + disasm_text = "" + for instr in result["instructions"]: + if isinstance(instr, dict): + addr = instr.get("address", "") + mnemonic = instr.get("mnemonic", "") + operands = instr.get("operands", "") + bytes_str = instr.get("bytes", "") + + # Format: address: bytes mnemonic operands + disasm_text += f"{addr}: {bytes_str.ljust(10)} {mnemonic} {operands}\n" + + return disasm_text + + # If we have a direct disassembly field, try that as well + if isinstance(result, dict) and "disassembly" in result: + return result["disassembly"] + + return "Error: Could not extract disassembly from response" + +@mcp.resource(uri="/instance/{port}/function/disassembly/name/{name}") +def disassembly_by_name(port: int = None, name: str = None) -> str: + """Get disassembled instructions for a function by name + + Args: + port: Specific Ghidra instance port + name: Function name + + Returns: + str: Formatted disassembly listing as a string + """ + if not name: + return "Error: Name parameter is required" + + port = _get_instance_port(port) + + endpoint = f"functions/by-name/{quote(name)}/disassembly" response = safe_get(port, endpoint) simplified = simplify_response(response) @@ -789,23 +918,36 @@ def analyze_function_prompt(name: str = None, address: str = None, port: int = N # Get function name if only address is provided if address and not name: - fn_info = function_info(address=address, port=port) + fn_info = function_info_by_address(address=address, port=port) if isinstance(fn_info, dict) and "name" in fn_info: name = fn_info["name"] # Create the template that guides analysis + decompiled = "" + disasm = "" + fn_info = None + + if address: + decompiled = decompiled_function_by_address(address=address, port=port) + disasm = disassembly_by_address(address=address, port=port) + fn_info = function_info_by_address(address=address, port=port) + elif name: + decompiled = decompiled_function_by_name(name=name, port=port) + disasm = disassembly_by_name(name=name, port=port) + fn_info = function_info_by_name(name=name, port=port) + return { "prompt": f""" Analyze the following function: {name or address} Decompiled code: ```c - {decompiled_function(name=name, address=address, port=port)} + {decompiled} ``` Disassembly: ``` - {disassembly(name=name, address=address, port=port)} + {disasm} ``` 1. What is the purpose of this function? @@ -815,7 +957,7 @@ def analyze_function_prompt(name: str = None, address: str = None, port: int = N 5. Describe the algorithm or process being implemented. """, "context": { - "function_info": function_info(name=name, address=address, port=port) + "function_info": fn_info } } @@ -832,18 +974,31 @@ def identify_vulnerabilities_prompt(name: str = None, address: str = None, port: # Get function name if only address is provided if address and not name: - fn_info = function_info(address=address, port=port) + fn_info = function_info_by_address(address=address, port=port) if isinstance(fn_info, dict) and "name" in fn_info: name = fn_info["name"] # Create the template focused on security analysis + decompiled = "" + disasm = "" + fn_info = None + + if address: + decompiled = decompiled_function_by_address(address=address, port=port) + disasm = disassembly_by_address(address=address, port=port) + fn_info = function_info_by_address(address=address, port=port) + elif name: + decompiled = decompiled_function_by_name(name=name, port=port) + disasm = disassembly_by_name(name=name, port=port) + fn_info = function_info_by_name(name=name, port=port) + return { "prompt": f""" Analyze the following function for security vulnerabilities: {name or address} Decompiled code: ```c - {decompiled_function(name=name, address=address, port=port)} + {decompiled} ``` Look for these vulnerability types: @@ -863,8 +1018,8 @@ def identify_vulnerabilities_prompt(name: str = None, address: str = None, port: - Recommend a fix """, "context": { - "function_info": function_info(name=name, address=address, port=port), - "disassembly": disassembly(name=name, address=address, port=port) + "function_info": fn_info, + "disassembly": disasm } } diff --git a/test_mcp_client.py b/test_mcp_client.py index 755209e..e6375b2 100644 --- a/test_mcp_client.py +++ b/test_mcp_client.py @@ -78,164 +78,159 @@ async def test_bridge(): logger.info("Initializing session...") init_result = await session.initialize() logger.info(f"Initialization result: {init_result}") - + # List tools logger.info("Listing tools...") tools_result = await session.list_tools() - # logger.info(f"Tools result: {tools_result}") - - # Call the discover_instances tool - logger.info("Calling discover_instances tool...") - discover_instances_result = await session.call_tool("discover_instances") + # logger.info(f"Tools result: {tools_result}") # Optional: uncomment for verbose tool listing + + # Call the instances_discover tool + logger.info("Calling instances_discover tool...") + discover_instances_result = await session.call_tool("instances_discover") logger.info(f"Discover instances result: {discover_instances_result}") - - # Call the list_instances tool - logger.info("Calling list_instances tool...") - list_instances_result = await session.call_tool("list_instances") + + # Call the instances_list tool + logger.info("Calling instances_list tool...") + list_instances_result = await session.call_tool("instances_list") logger.info(f"List instances result: {list_instances_result}") - - # Call the list_functions tool with the new HATEOAS API - logger.info("Calling list_functions tool...") + + # Set the current instance to use for subsequent calls + logger.info(f"Setting current instance to port {GHYDRAMCP_TEST_PORT}...") + use_instance_result = await session.call_tool("instances_use", arguments={"port": GHYDRAMCP_TEST_PORT}) + logger.info(f"Use instance result: {use_instance_result}") + + # Call the functions_list tool (no port needed now) + logger.info("Calling functions_list tool...") list_functions_result = await session.call_tool( - "list_functions", - arguments={"port": GHYDRAMCP_TEST_PORT, "offset": 0, "limit": 5} + "functions_list", + arguments={"offset": 0, "limit": 5} # No port needed ) logger.info(f"List functions result: {list_functions_result}") - - # Test the current program endpoint - logger.info("Calling get_program_info tool...") - current_program_result = await session.call_tool( - "get_program_info", - arguments={"port": GHYDRAMCP_TEST_PORT} - ) - logger.info(f"Current program result: {current_program_result}") + + # Test the current instance endpoint + logger.info("Calling instances_current tool...") + current_program_result = await session.call_tool("instances_current") # No args needed + logger.info(f"Current instance result: {current_program_result}") + # Add assertion for current instance result structure if needed + current_data = json.loads(current_program_result.content[0].text) + assert "port" in current_data, "Missing port in instances_current result" + assert "program_name" in current_data, "Missing program_name in instances_current result" # Test mutating operations by changing and reverting logger.info("Testing mutating operations...") - + try: - # Get a known function to test with from list_functions result + # Get a known function to test with from functions_list result list_funcs = await session.call_tool( - "list_functions", - arguments={"port": GHYDRAMCP_TEST_PORT, "offset": 0, "limit": 5} + "functions_list", + arguments={"offset": 0, "limit": 5} # No port needed ) - + if not list_funcs or not list_funcs.content: - logger.warning("No functions found - skipping mutating tests") + logger.warning("No functions found via functions_list - skipping mutating tests") return - - # Parse the JSON response from list_functions using helper + + # Parse the JSON response from functions_list using helper try: list_funcs_data = await assert_standard_mcp_success_response(list_funcs.content, expected_result_type=list) func_list = list_funcs_data.get("result", []) if not func_list: - logger.warning("No functions in list_functions result - skipping mutating tests") + logger.warning("No functions in functions_list result - skipping mutating tests") return # Get first function's name and address + # Ensure the item is a dictionary before accessing keys first_func = func_list[0] + if not isinstance(first_func, dict): + logger.warning(f"First item in functions_list is not a dict: {first_func} - skipping mutating tests") + return + func_name = first_func.get("name", "") func_address = first_func.get("address", "") if not func_name or not func_address: - logger.warning("No function name/address found in list_functions result - skipping mutating tests") + logger.warning("No function name/address found in functions_list result - skipping mutating tests") return - except AssertionError as e: - logger.warning(f"Error processing list_functions data: {e} - skipping mutating tests") + except (AssertionError, IndexError, TypeError) as e: + logger.warning(f"Error processing functions_list data: {e} - skipping mutating tests") return - # Test function renaming + # Test function renaming using functions_rename original_name = func_name - test_name = f"{func_name}_test" + test_name = f"{func_name}_test_mcp" # Make name slightly more unique - # Test successful rename operations using rename_function - rename_args = {"port": GHYDRAMCP_TEST_PORT, "name": original_name, "new_name": test_name} - logger.info(f"Calling rename_function with args: {rename_args}") - rename_result = await session.call_tool("rename_function", arguments=rename_args) + # Test successful rename operations using functions_rename (no port needed) + rename_args = {"old_name": original_name, "new_name": test_name} # Use old_name instead of name + logger.info(f"Calling functions_rename with args: {rename_args}") + rename_result = await session.call_tool("functions_rename", arguments=rename_args) rename_data = json.loads(rename_result.content[0].text) # Parse simple response assert rename_data.get("success") is True, f"Rename failed: {rename_data}" logger.info(f"Rename result: {rename_result}") - - # Verify rename by getting the function - renamed_func = await session.call_tool("get_function", arguments={"port": 8192, "name": test_name}) + + # Verify rename by getting the function using functions_get (no port needed) + renamed_func = await session.call_tool("functions_get", arguments={"name": test_name}) renamed_data = await assert_standard_mcp_success_response(renamed_func.content, expected_result_type=dict) assert renamed_data.get("result", {}).get("name") == test_name, f"Renamed function has wrong name: {renamed_data}" logger.info(f"Renamed function result: {renamed_func}") - - # Rename back to original - revert_args = {"port": GHYDRAMCP_TEST_PORT, "name": test_name, "new_name": original_name} - logger.info(f"Calling rename_function with args: {revert_args}") - revert_result = await session.call_tool("rename_function", arguments=revert_args) + + # Rename back to original using functions_rename (no port needed) + revert_args = {"old_name": test_name, "new_name": original_name} # Use old_name + logger.info(f"Calling functions_rename with args: {revert_args}") + revert_result = await session.call_tool("functions_rename", arguments=revert_args) revert_data = json.loads(revert_result.content[0].text) # Parse simple response assert revert_data.get("success") is True, f"Revert rename failed: {revert_data}" logger.info(f"Revert rename result: {revert_result}") - - # Verify revert by getting the function - original_func = await session.call_tool("get_function", arguments={"port": GHYDRAMCP_TEST_PORT, "name": original_name}) + + # Verify revert by getting the function using functions_get (no port needed) + original_func = await session.call_tool("functions_get", arguments={"name": original_name}) original_data = await assert_standard_mcp_success_response(original_func.content, expected_result_type=dict) assert original_data.get("result", {}).get("name") == original_name, f"Original function has wrong name: {original_data}" logger.info(f"Original function result: {original_func}") - # Test get_function with address parameter - logger.info(f"Calling get_function with address: {func_address}") - get_by_addr_result = await session.call_tool("get_function", arguments={"port": GHYDRAMCP_TEST_PORT, "address": func_address}) + # Test functions_get with address parameter (no port needed) + logger.info(f"Calling functions_get with address: {func_address}") + get_by_addr_result = await session.call_tool("functions_get", arguments={"address": func_address}) get_by_addr_data = await assert_standard_mcp_success_response(get_by_addr_result.content, expected_result_type=dict) result_data = get_by_addr_data.get("result", {}) - assert "name" in result_data, "Missing name field in get_function result" - assert "address" in result_data, "Missing address field in get_function result" - assert "signature" in result_data, "Missing signature field in get_function result" - assert result_data.get("name") == original_name, f"Wrong name in get_function: {result_data.get('name')}" + assert "name" in result_data, "Missing name field in functions_get result" + assert "address" in result_data, "Missing address field in functions_get result" + assert "signature" in result_data, "Missing signature field in functions_get result" + assert result_data.get("name") == original_name, f"Wrong name in functions_get: {result_data.get('name')}" logger.info(f"Get function by address result: {get_by_addr_result}") - # Test decompile_function - logger.info(f"Calling decompile_function with address: {func_address}") - decompile_result = await session.call_tool("decompile_function", arguments={"port": GHYDRAMCP_TEST_PORT, "address": func_address}) + # Test functions_decompile (no port needed) + logger.info(f"Calling functions_decompile with address: {func_address}") + decompile_result = await session.call_tool("functions_decompile", arguments={"address": func_address}) decompile_data = await assert_standard_mcp_success_response(decompile_result.content, expected_result_type=dict) - - # The decompiled code might be in different fields depending on version - has_decompiled = False - if "decompiled_code" in decompile_data: - has_decompiled = True - elif "decompiled_text" in decompile_data: - has_decompiled = True - elif "result" in decompile_data and isinstance(decompile_data["result"], dict): - result = decompile_data["result"] - if "ccode" in result or "decompiled" in result or "decompiled_text" in result: - has_decompiled = True - - assert has_decompiled, f"Decompile result missing decompiled code: {decompile_data}" + + # Check for decompiled code (bridge adds 'decompiled_code' field) + assert "decompiled_code" in decompile_data, f"Decompile result missing decompiled_code field: {decompile_data}" + assert isinstance(decompile_data["decompiled_code"], str), "decompiled_code should be a string" logger.info(f"Decompile function result: {decompile_result}") - - # Test disassemble_function - logger.info(f"Calling disassemble_function with address: {func_address}") - disassemble_result = await session.call_tool("disassemble_function", arguments={"port": GHYDRAMCP_TEST_PORT, "address": func_address}) + + # Test functions_disassemble (no port needed) + logger.info(f"Calling functions_disassemble with address: {func_address}") + disassemble_result = await session.call_tool("functions_disassemble", arguments={"address": func_address}) disassemble_data = json.loads(disassemble_result.content[0].text) assert disassemble_data.get("success") is True, f"Disassemble failed: {disassemble_data}" - - # Check for disassembly text in the simplified format - has_disassembly = False - if "disassembly" in disassemble_data: - has_disassembly = True - elif "result" in disassemble_data and isinstance(disassemble_data["result"], dict): - result = disassemble_data["result"] - if "disassembly_text" in result: - has_disassembly = True - elif "instructions" in result: - has_disassembly = True - - assert has_disassembly, f"Disassembly result missing disassembly text: {disassemble_data}" - - # Check additional function info - if "function_name" in disassemble_data: - assert isinstance(disassemble_data["function_name"], str), "function_name should be a string" - if "function_address" in disassemble_data: - assert isinstance(disassemble_data["function_address"], str), "function_address should be a string" - + + # Check for disassembly text (bridge adds 'disassembly_text' field) + assert "result" in disassemble_data and isinstance(disassemble_data["result"], dict), "Disassembly result missing 'result' object" + result = disassemble_data["result"] + assert "disassembly_text" in result, f"Disassembly result missing disassembly_text field: {disassemble_data}" + assert isinstance(result["disassembly_text"], str), "disassembly_text should be a string" + + # Check additional function info if present + if "name" in result: + assert isinstance(result["name"], str), "function name should be a string" + if "address" in result: + assert isinstance(result["address"], str), "function address should be a string" + logger.info(f"Disassemble function result: {disassemble_result}") - # Test get_function_variables instead of list_variables - logger.info("Calling get_function_variables tool...") - function_vars_result = await session.call_tool("get_function_variables", arguments={"port": 8192, "address": func_address}) + # Test functions_get_variables (no port needed) + logger.info("Calling functions_get_variables tool...") + function_vars_result = await session.call_tool("functions_get_variables", arguments={"address": func_address}) try: vars_data = await assert_standard_mcp_success_response(function_vars_result.content, expected_result_type=dict) if "result" in vars_data and isinstance(vars_data["result"], dict) and "variables" in vars_data["result"]: @@ -243,183 +238,124 @@ async def test_bridge(): if variables_list and len(variables_list) > 0: for var in variables_list: assert "name" in var, f"Variable missing name: {var}" + assert "dataType" in var, f"Variable missing dataType: {var}" logger.info(f"Function variables result: {function_vars_result}") else: logger.info("Function variables available but no variables found in function.") except (AssertionError, KeyError) as e: logger.warning(f"Could not validate function variables: {e}") - # Test comment operations using set_comment - test_comment = "Test comment from MCP client" - comment_args = {"port": 8192, "address": func_address, "comment": test_comment, "comment_type": "plate"} - logger.info(f"Calling set_comment with args: {comment_args}") - comment_result = await session.call_tool("set_comment", arguments=comment_args) - comment_data = json.loads(comment_result.content[0].text) - assert comment_data.get("success") is True, f"Add comment failed: {comment_data}" - logger.info(f"Add comment result: {comment_result}") - - # Remove comment - remove_comment_args = {"port": 8192, "address": func_address, "comment": "", "comment_type": "plate"} - logger.info(f"Calling set_comment with args: {remove_comment_args}") - remove_comment_result = await session.call_tool("set_comment", arguments=remove_comment_args) - remove_data = json.loads(remove_comment_result.content[0].text) - assert remove_data.get("success") is True, f"Remove comment failed: {remove_data}" - logger.info(f"Remove comment result: {remove_comment_result}") - - # Test comments using set_decompiler_comment (which is a convenience wrapper for set_comment) - test_comment = "Test decompiler comment from MCP client" - decompiler_comment_args = {"port": 8192, "address": func_address, "comment": test_comment} - logger.info(f"Calling set_decompiler_comment with args: {decompiler_comment_args}") - decompiler_comment_result = await session.call_tool("set_decompiler_comment", arguments=decompiler_comment_args) - decompiler_comment_data = json.loads(decompiler_comment_result.content[0].text) - assert decompiler_comment_data.get("success") is True, f"Add decompiler comment failed: {decompiler_comment_data}" - logger.info(f"Add decompiler comment result: {decompiler_comment_result}") - - # Remove decompiler comment - remove_decompiler_comment_args = {"port": 8192, "address": func_address, "comment": ""} - logger.info(f"Calling set_decompiler_comment with args: {remove_decompiler_comment_args}") - remove_decompiler_comment_result = await session.call_tool("set_decompiler_comment", arguments=remove_decompiler_comment_args) - remove_decompiler_data = json.loads(remove_decompiler_comment_result.content[0].text) - assert remove_decompiler_data.get("success") is True, f"Remove decompiler comment failed: {remove_decompiler_data}" - logger.info(f"Remove decompiler comment result: {remove_decompiler_comment_result}") - + # REMOVED: Tests for set_comment and set_decompiler_comment as tools no longer exist + # Test expected failure cases - # Try to rename non-existent function - bad_rename_args = {"port": 8192, "name": "nonexistent_function", "new_name": "should_fail"} - logger.info(f"Calling rename_function with args: {bad_rename_args}") + # Try to rename non-existent function using functions_rename + bad_rename_args = {"old_name": "nonexistent_function_mcp", "new_name": "should_fail"} + logger.info(f"Calling functions_rename with args: {bad_rename_args}") try: - bad_rename_result = await session.call_tool("rename_function", arguments=bad_rename_args) + bad_rename_result = await session.call_tool("functions_rename", arguments=bad_rename_args) logger.info(f"Bad rename result: {bad_rename_result}") # Log the response bad_rename_data = json.loads(bad_rename_result.content[0].text) assert bad_rename_data.get("success") is False, f"Renaming non-existent function should fail, but got: {bad_rename_data}" except Exception as e: # It's also acceptable if the tool call itself fails, as long as it doesn't succeed - logger.info(f"Expected failure: rename_function properly rejected bad parameters: {e}") + logger.info(f"Expected failure: functions_rename properly rejected bad parameters: {e}") - # Try to get non-existent function + # Try to get non-existent function using functions_get bad_get_result = await session.call_tool( - "get_function", - arguments={"port": 8192, "name": "nonexistent_function"} + "functions_get", + arguments={"name": "nonexistent_function_mcp"} ) logger.info(f"Bad get result: {bad_get_result}") # Log the response bad_get_data = json.loads(bad_get_result.content[0].text) assert bad_get_data.get("success") is False, f"Getting non-existent function should fail, but got: {bad_get_data}" - # Try to comment on invalid address - bad_comment_args = {"port": 8192, "address": "0xinvalid", "comment": "should fail", "comment_type": "plate"} - logger.info(f"Calling set_comment with args: {bad_comment_args}") - bad_comment_result = await session.call_tool("set_comment", arguments=bad_comment_args) - bad_comment_data = json.loads(bad_comment_result.content[0].text) - assert bad_comment_data.get("success") is False, "Commenting on invalid address should fail" + # REMOVED: Test for commenting on invalid address as set_comment tool no longer exists + # REMOVED: Tests for get_current_address and get_current_function as tools no longer exist - # Test get_current_address - logger.info("Calling get_current_address tool...") - current_addr_result = await session.call_tool("get_current_address", arguments={"port": 8192}) - current_addr_data = await assert_standard_mcp_success_response(current_addr_result.content, expected_result_type=dict) - assert "address" in current_addr_data.get("result", {}), "Missing address in get_current_address result" - assert isinstance(current_addr_data.get("result", {}).get("address", ""), str), "Address should be a string" - logger.info(f"Get current address result: {current_addr_result}") - - # Test get_current_function - logger.info("Calling get_current_function tool...") - current_func_result = await session.call_tool("get_current_function", arguments={"port": 8192}) - current_func_data = await assert_standard_mcp_success_response(current_func_result.content, expected_result_type=dict) - result_data = current_func_data.get("result", {}) - assert "name" in result_data, "Missing name in get_current_function result" - assert "address" in result_data, "Missing address in get_current_function result" - assert "signature" in result_data, "Missing signature in get_current_function result" - logger.info(f"Get current function result: {current_func_result}") - - # Test read_memory functionality - logger.info(f"Calling read_memory with address: {func_address}") - read_memory_result = await session.call_tool("read_memory", arguments={"port": 8192, "address": func_address, "length": 16}) + # Test memory_read functionality (no port needed) + logger.info(f"Calling memory_read with address: {func_address}") + read_memory_result = await session.call_tool("memory_read", arguments={"address": func_address, "length": 16}) read_memory_data = json.loads(read_memory_result.content[0].text) assert read_memory_data.get("success") is True, f"Read memory failed: {read_memory_data}" - assert "hexBytes" in read_memory_data, "Missing hexBytes in read_memory result" - assert "rawBytes" in read_memory_data, "Missing rawBytes in read_memory result" - assert read_memory_data.get("address") == func_address, f"Wrong address in read_memory result: {read_memory_data.get('address')}" + # Bridge simplification puts data directly in response + assert "hexBytes" in read_memory_data, "Missing hexBytes in memory_read result" + assert "rawBytes" in read_memory_data, "Missing rawBytes in memory_read result" + assert read_memory_data.get("address") == func_address, f"Wrong address in memory_read result: {read_memory_data.get('address')}" logger.info(f"Read memory result: {read_memory_result}") - - # Test data operations (create, rename, change type) + + # Test data operations (create, rename, change type, delete) using namespaced tools logger.info("Testing data operations...") try: # Get a memory address to create test data data_address = func_address - - # First create test data - create_data_args = {"port": 8192, "address": data_address, "data_type": "uint32_t"} - logger.info(f"Calling create_data with args: {create_data_args}") - create_data_result = await session.call_tool("create_data", arguments=create_data_args) + original_data_type = "undefined" # Placeholder, might not exist initially + + # First create test data using data_create (no port needed) + create_data_args = {"address": data_address, "data_type": "uint32_t"} + logger.info(f"Calling data_create with args: {create_data_args}") + create_data_result = await session.call_tool("data_create", arguments=create_data_args) create_data_response = json.loads(create_data_result.content[0].text) assert create_data_response.get("success") is True, f"Create data failed: {create_data_response}" logger.info(f"Create data result: {create_data_result}") - - # Test Case 1: Data rename operation (name only) - test_data_name = "test_data_item" - rename_data_args = {"port": 8192, "address": data_address, "name": test_data_name} - logger.info(f"Calling rename_data with args: {rename_data_args}") - rename_data_result = await session.call_tool("rename_data", arguments=rename_data_args) + original_data_type = "uint32_t" # Update original type + + # Test Case 1: Data rename operation using data_rename (no port needed) + test_data_name = "test_data_item_mcp" + rename_data_args = {"address": data_address, "name": test_data_name} + logger.info(f"Calling data_rename with args: {rename_data_args}") + rename_data_result = await session.call_tool("data_rename", arguments=rename_data_args) rename_data_response = json.loads(rename_data_result.content[0].text) assert rename_data_response.get("success") is True, f"Rename data failed: {rename_data_response}" logger.info(f"Rename data result: {rename_data_result}") - - # Verify the name was changed + + # Verify the name was changed (check the result field) if rename_data_response.get("result", {}).get("name") != test_data_name: logger.warning(f"Rename operation didn't set the expected name. Got: {rename_data_response.get('result', {}).get('name')}") - - # Test Case 2: Data type change operation (type only) - change_type_args = {"port": 8192, "address": data_address, "data_type": "int"} - logger.info(f"Calling set_data_type with args: {change_type_args}") - change_type_result = await session.call_tool("set_data_type", arguments=change_type_args) + + # Test Case 2: Data type change operation using data_set_type (no port needed) + change_type_args = {"address": data_address, "data_type": "int"} + logger.info(f"Calling data_set_type with args: {change_type_args}") + change_type_result = await session.call_tool("data_set_type", arguments=change_type_args) change_type_response = json.loads(change_type_result.content[0].text) assert change_type_response.get("success") is True, f"Change data type failed: {change_type_response}" logger.info(f"Change data type result: {change_type_result}") - + # Verify the type was changed but name was preserved result = change_type_response.get("result", {}) if result.get("dataType") != "int": logger.warning(f"Type change operation didn't set the expected type. Got: {result.get('dataType')}") if result.get("name") != test_data_name: logger.warning(f"Type change operation didn't preserve the name. Expected: {test_data_name}, Got: {result.get('name')}") - - # Test Case 3: Combined update operation (both name and type) - update_data_args = { - "port": 8192, - "address": data_address, - "name": "updated_data_item", - "data_type": "byte" - } - logger.info(f"Calling update_data with args: {update_data_args}") - update_data_result = await session.call_tool("update_data", arguments=update_data_args) - update_data_response = json.loads(update_data_result.content[0].text) - assert update_data_response.get("success") is True, f"Update data failed: {update_data_response}" - logger.info(f"Update data result: {update_data_result}") - - # Verify both name and type were changed - result = update_data_response.get("result", {}) - if result.get("name") != "updated_data_item": - logger.warning(f"Update operation didn't set the expected name. Got: {result.get('name')}") - if result.get("dataType") != "byte": - logger.warning(f"Update operation didn't set the expected type. Got: {result.get('dataType')}") - - # Clean up by restoring original data type - restore_type_args = {"port": 8192, "address": data_address, "data_type": "uint32_t"} - logger.info(f"Restoring data type with args: {restore_type_args}") - restore_type_result = await session.call_tool("set_data_type", arguments=restore_type_args) - restore_type_response = json.loads(restore_type_result.content[0].text) - assert restore_type_response.get("success") is True, f"Restore data type failed: {restore_type_response}" - + + # REMOVED: Test Case 3 (Combined update) as update_data tool no longer exists + + # Clean up by deleting the created data using data_delete + delete_data_args = {"address": data_address} + logger.info(f"Deleting data with args: {delete_data_args}") + delete_data_result = await session.call_tool("data_delete", arguments=delete_data_args) + delete_data_response = json.loads(delete_data_result.content[0].text) + assert delete_data_response.get("success") is True, f"Delete data failed: {delete_data_response}" + logger.info(f"Delete data result: {delete_data_result}") + except Exception as e: - logger.warning(f"Error testing data operations: {e} - This is not critical") - - # Test callgraph functionality - handle possible failure gracefully - if func_address: - logger.info(f"Calling get_callgraph with address: {func_address}") + logger.warning(f"Error testing data operations: {e} - This is not critical. Attempting cleanup.") + # Attempt cleanup even if tests failed mid-way try: - callgraph_result = await session.call_tool("get_callgraph", arguments={"port": 8192, "address": func_address}) + delete_data_args = {"address": data_address} + await session.call_tool("data_delete", arguments=delete_data_args) + logger.info("Data cleanup attempted.") + except Exception as cleanup_e: + logger.error(f"Data cleanup failed: {cleanup_e}") + + + # Test callgraph functionality using analysis_get_callgraph (no port needed) + if func_address: + logger.info(f"Calling analysis_get_callgraph with address: {func_address}") + try: + callgraph_result = await session.call_tool("analysis_get_callgraph", arguments={"function": func_address}) callgraph_data = json.loads(callgraph_result.content[0].text) if callgraph_data.get("success"): - assert "result" in callgraph_data, "Missing result in get_callgraph response" + assert "result" in callgraph_data, "Missing result in analysis_get_callgraph response" # The result could be either a dict with nodes/edges or a direct graph representation logger.info(f"Get callgraph result: successful") else: @@ -427,60 +363,58 @@ async def test_bridge(): logger.info(f"Get callgraph result: failed - {callgraph_data.get('error', {}).get('message', 'Unknown error')}") except Exception as e: logger.warning(f"Error in callgraph test: {e} - This is not critical") - - # Test function signature operations + + # Test function signature operations using functions_set_signature logger.info("Testing function signature operations...") try: - # Get current signature - get_func_for_sig = await session.call_tool("get_function", arguments={"port": 8192, "address": func_address}) + # Get current signature using functions_get + get_func_for_sig = await session.call_tool("functions_get", arguments={"address": func_address}) get_func_for_sig_data = await assert_standard_mcp_success_response(get_func_for_sig.content, expected_result_type=dict) original_signature = get_func_for_sig_data.get("result", {}).get("signature", "") - + if not original_signature: logger.warning("Could not get original signature - skipping signature test") else: # Create test signature by adding parameters - modified_signature = f"int {func_name}(uint32_t *data, int block_count, uint32_t *key)" + modified_signature = f"int {func_name}(uint32_t *mcp_data, int mcp_count, uint32_t *mcp_key)" logger.info(f"Original signature: {original_signature}") logger.info(f"Setting function signature to: {modified_signature}") - - # Set new signature - set_sig_result = await session.call_tool("set_function_signature", - arguments={"port": 8192, - "address": func_address, + + # Set new signature using functions_set_signature (no port needed) + set_sig_result = await session.call_tool("functions_set_signature", + arguments={"address": func_address, "signature": modified_signature}) set_sig_data = json.loads(set_sig_result.content[0].text) assert set_sig_data.get("success") is True, f"Set signature failed: {set_sig_data}" logger.info(f"Set signature result: {set_sig_result}") - - # Verify the change - verify_sig_result = await session.call_tool("get_function", arguments={"port": 8192, "address": func_address}) + + # Verify the change using functions_get + verify_sig_result = await session.call_tool("functions_get", arguments={"address": func_address}) verify_sig_data = await assert_standard_mcp_success_response(verify_sig_result.content, expected_result_type=dict) new_signature = verify_sig_data.get("result", {}).get("signature", "") - assert "uint32_t *data" in new_signature, f"Signature not properly updated: {new_signature}" + assert "uint32_t *mcp_data" in new_signature, f"Signature not properly updated: {new_signature}" logger.info(f"Updated signature: {new_signature}") - - # Restore original signature + + # Restore original signature using functions_set_signature logger.info(f"Restoring original signature: {original_signature}") - restore_sig_result = await session.call_tool("set_function_signature", - arguments={"port": 8192, - "address": func_address, + restore_sig_result = await session.call_tool("functions_set_signature", + arguments={"address": func_address, "signature": original_signature}) restore_sig_data = json.loads(restore_sig_result.content[0].text) assert restore_sig_data.get("success") is True, f"Restore signature failed: {restore_sig_data}" logger.info(f"Restore signature result: {restore_sig_result}") - - # Verify restoration - final_func_result = await session.call_tool("get_function", arguments={"port": 8192, "address": func_address}) + + # Verify restoration using functions_get + final_func_result = await session.call_tool("functions_get", arguments={"address": func_address}) final_func_data = await assert_standard_mcp_success_response(final_func_result.content, expected_result_type=dict) final_signature = final_func_data.get("result", {}).get("signature", "") assert final_signature == original_signature, f"Signature not properly restored: {final_signature}" logger.info(f"Restored signature: {final_signature}") except Exception as e: logger.warning(f"Error in signature test: {e} - This is not critical") - + except Exception as e: - logger.error(f"Error testing mutating operations: {e}") + logger.error(f"Error testing mutating operations: {e}", exc_info=True) raise def main():