fix: add URIs to resources and split resource access by name and address

This commit is contained in:
Teal Bauer 2025-04-15 10:51:50 +02:00
parent 8313b2bd7b
commit b37be370f8
2 changed files with 389 additions and 300 deletions

View File

@ -580,7 +580,7 @@ def handle_sigint(signum, frame):
# Resources provide information that can be loaded directly into context
# They focus on data and minimize metadata
@mcp.resource()
@mcp.resource(uri="/instance/{port}")
def ghidra_instance(port: int = None) -> dict:
"""Get detailed information about a Ghidra instance and the loaded program
@ -620,20 +620,19 @@ def ghidra_instance(port: int = None) -> dict:
return instance_info
@mcp.resource()
def decompiled_function(name: str = None, address: str = None, port: int = None) -> str:
"""Get decompiled C code for a function
@mcp.resource(uri="/instance/{port}/function/decompile/address/{address}")
def decompiled_function_by_address(port: int = None, address: str = None) -> str:
"""Get decompiled C code for a function by address
Args:
name: Function name (mutually exclusive with address)
address: Function address in hex format (mutually exclusive with address)
port: Specific Ghidra instance port (optional)
port: Specific Ghidra instance port
address: Function address in hex format
Returns:
str: The decompiled C code as a string, or error message
"""
if not name and not address:
return "Error: Either name or address parameter is required"
if not address:
return "Error: Address parameter is required"
port = _get_instance_port(port)
@ -642,10 +641,7 @@ def decompiled_function(name: str = None, address: str = None, port: int = None)
"style": "normalize"
}
if address:
endpoint = f"functions/{address}/decompile"
else:
endpoint = f"functions/by-name/{quote(name)}/decompile"
endpoint = f"functions/{address}/decompile"
response = safe_get(port, endpoint, params)
simplified = simplify_response(response)
@ -673,27 +669,72 @@ def decompiled_function(name: str = None, address: str = None, port: int = None)
return "Error: Could not extract decompiled code from response"
@mcp.resource()
def function_info(name: str = None, address: str = None, port: int = None) -> dict:
"""Get detailed information about a function
@mcp.resource(uri="/instance/{port}/function/decompile/name/{name}")
def decompiled_function_by_name(port: int = None, name: str = None) -> str:
"""Get decompiled C code for a function by name
Args:
name: Function name (mutually exclusive with address)
address: Function address in hex format (mutually exclusive with address)
port: Specific Ghidra instance port (optional)
port: Specific Ghidra instance port
name: Function name
Returns:
str: The decompiled C code as a string, or error message
"""
if not name:
return "Error: Name parameter is required"
port = _get_instance_port(port)
params = {
"syntax_tree": "false",
"style": "normalize"
}
endpoint = f"functions/by-name/{quote(name)}/decompile"
response = safe_get(port, endpoint, params)
simplified = simplify_response(response)
# For a resource, we want to directly return just the decompiled code
if (not isinstance(simplified, dict) or
not simplified.get("success", False) or
"result" not in simplified):
error_message = "Error: Could not decompile function"
if isinstance(simplified, dict) and "error" in simplified:
if isinstance(simplified["error"], dict):
error_message = simplified["error"].get("message", error_message)
else:
error_message = str(simplified["error"])
return error_message
# Extract just the decompiled code text
result = simplified["result"]
# Different endpoints may return the code in different fields, try all of them
if isinstance(result, dict):
for key in ["decompiled_text", "ccode", "decompiled"]:
if key in result:
return result[key]
return "Error: Could not extract decompiled code from response"
@mcp.resource(uri="/instance/{port}/function/info/address/{address}")
def function_info_by_address(port: int = None, address: str = None) -> dict:
"""Get detailed information about a function by address
Args:
port: Specific Ghidra instance port
address: Function address in hex format
Returns:
dict: Complete function information including signature, parameters, etc.
"""
if not name and not address:
return {"error": "Either name or address parameter is required"}
if not address:
return {"error": "Address parameter is required"}
port = _get_instance_port(port)
if address:
endpoint = f"functions/{address}"
else:
endpoint = f"functions/by-name/{quote(name)}"
endpoint = f"functions/{address}"
response = safe_get(port, endpoint)
simplified = simplify_response(response)
@ -709,27 +750,115 @@ def function_info(name: str = None, address: str = None, port: int = None) -> di
# Return just the function data without API metadata
return simplified["result"]
@mcp.resource()
def disassembly(name: str = None, address: str = None, port: int = None) -> str:
"""Get disassembled instructions for a function
@mcp.resource(uri="/instance/{port}/function/info/name/{name}")
def function_info_by_name(port: int = None, name: str = None) -> dict:
"""Get detailed information about a function by name
Args:
name: Function name (mutually exclusive with address)
address: Function address in hex format (mutually exclusive with address)
port: Specific Ghidra instance port (optional)
port: Specific Ghidra instance port
name: Function name
Returns:
dict: Complete function information including signature, parameters, etc.
"""
if not name:
return {"error": "Name parameter is required"}
port = _get_instance_port(port)
endpoint = f"functions/by-name/{quote(name)}"
response = safe_get(port, endpoint)
simplified = simplify_response(response)
if (not isinstance(simplified, dict) or
not simplified.get("success", False) or
"result" not in simplified):
error = {"error": "Could not get function information"}
if isinstance(simplified, dict) and "error" in simplified:
error["error_details"] = simplified["error"]
return error
# Return just the function data without API metadata
return simplified["result"]
@mcp.resource(uri="/instance/{port}/function/disassembly/address/{address}")
def disassembly_by_address(port: int = None, address: str = None) -> str:
"""Get disassembled instructions for a function by address
Args:
port: Specific Ghidra instance port
address: Function address in hex format
Returns:
str: Formatted disassembly listing as a string
"""
if not name and not address:
return "Error: Either name or address parameter is required"
if not address:
return "Error: Address parameter is required"
port = _get_instance_port(port)
if address:
endpoint = f"functions/{address}/disassembly"
else:
endpoint = f"functions/by-name/{quote(name)}/disassembly"
endpoint = f"functions/{address}/disassembly"
response = safe_get(port, endpoint)
simplified = simplify_response(response)
if (not isinstance(simplified, dict) or
not simplified.get("success", False) or
"result" not in simplified):
error_message = "Error: Could not get disassembly"
if isinstance(simplified, dict) and "error" in simplified:
if isinstance(simplified["error"], dict):
error_message = simplified["error"].get("message", error_message)
else:
error_message = str(simplified["error"])
return error_message
# For a resource, we want to directly return just the disassembly text
result = simplified["result"]
# Check if we have a disassembly_text field already
if isinstance(result, dict) and "disassembly_text" in result:
return result["disassembly_text"]
# Otherwise if we have raw instructions, format them ourselves
if isinstance(result, dict) and "instructions" in result and isinstance(result["instructions"], list):
disasm_text = ""
for instr in result["instructions"]:
if isinstance(instr, dict):
addr = instr.get("address", "")
mnemonic = instr.get("mnemonic", "")
operands = instr.get("operands", "")
bytes_str = instr.get("bytes", "")
# Format: address: bytes mnemonic operands
disasm_text += f"{addr}: {bytes_str.ljust(10)} {mnemonic} {operands}\n"
return disasm_text
# If we have a direct disassembly field, try that as well
if isinstance(result, dict) and "disassembly" in result:
return result["disassembly"]
return "Error: Could not extract disassembly from response"
@mcp.resource(uri="/instance/{port}/function/disassembly/name/{name}")
def disassembly_by_name(port: int = None, name: str = None) -> str:
"""Get disassembled instructions for a function by name
Args:
port: Specific Ghidra instance port
name: Function name
Returns:
str: Formatted disassembly listing as a string
"""
if not name:
return "Error: Name parameter is required"
port = _get_instance_port(port)
endpoint = f"functions/by-name/{quote(name)}/disassembly"
response = safe_get(port, endpoint)
simplified = simplify_response(response)
@ -789,23 +918,36 @@ def analyze_function_prompt(name: str = None, address: str = None, port: int = N
# Get function name if only address is provided
if address and not name:
fn_info = function_info(address=address, port=port)
fn_info = function_info_by_address(address=address, port=port)
if isinstance(fn_info, dict) and "name" in fn_info:
name = fn_info["name"]
# Create the template that guides analysis
decompiled = ""
disasm = ""
fn_info = None
if address:
decompiled = decompiled_function_by_address(address=address, port=port)
disasm = disassembly_by_address(address=address, port=port)
fn_info = function_info_by_address(address=address, port=port)
elif name:
decompiled = decompiled_function_by_name(name=name, port=port)
disasm = disassembly_by_name(name=name, port=port)
fn_info = function_info_by_name(name=name, port=port)
return {
"prompt": f"""
Analyze the following function: {name or address}
Decompiled code:
```c
{decompiled_function(name=name, address=address, port=port)}
{decompiled}
```
Disassembly:
```
{disassembly(name=name, address=address, port=port)}
{disasm}
```
1. What is the purpose of this function?
@ -815,7 +957,7 @@ def analyze_function_prompt(name: str = None, address: str = None, port: int = N
5. Describe the algorithm or process being implemented.
""",
"context": {
"function_info": function_info(name=name, address=address, port=port)
"function_info": fn_info
}
}
@ -832,18 +974,31 @@ def identify_vulnerabilities_prompt(name: str = None, address: str = None, port:
# Get function name if only address is provided
if address and not name:
fn_info = function_info(address=address, port=port)
fn_info = function_info_by_address(address=address, port=port)
if isinstance(fn_info, dict) and "name" in fn_info:
name = fn_info["name"]
# Create the template focused on security analysis
decompiled = ""
disasm = ""
fn_info = None
if address:
decompiled = decompiled_function_by_address(address=address, port=port)
disasm = disassembly_by_address(address=address, port=port)
fn_info = function_info_by_address(address=address, port=port)
elif name:
decompiled = decompiled_function_by_name(name=name, port=port)
disasm = disassembly_by_name(name=name, port=port)
fn_info = function_info_by_name(name=name, port=port)
return {
"prompt": f"""
Analyze the following function for security vulnerabilities: {name or address}
Decompiled code:
```c
{decompiled_function(name=name, address=address, port=port)}
{decompiled}
```
Look for these vulnerability types:
@ -863,8 +1018,8 @@ def identify_vulnerabilities_prompt(name: str = None, address: str = None, port:
- Recommend a fix
""",
"context": {
"function_info": function_info(name=name, address=address, port=port),
"disassembly": disassembly(name=name, address=address, port=port)
"function_info": fn_info,
"disassembly": disasm
}
}

View File

@ -82,160 +82,155 @@ async def test_bridge():
# List tools
logger.info("Listing tools...")
tools_result = await session.list_tools()
# logger.info(f"Tools result: {tools_result}")
# logger.info(f"Tools result: {tools_result}") # Optional: uncomment for verbose tool listing
# Call the discover_instances tool
logger.info("Calling discover_instances tool...")
discover_instances_result = await session.call_tool("discover_instances")
# Call the instances_discover tool
logger.info("Calling instances_discover tool...")
discover_instances_result = await session.call_tool("instances_discover")
logger.info(f"Discover instances result: {discover_instances_result}")
# Call the list_instances tool
logger.info("Calling list_instances tool...")
list_instances_result = await session.call_tool("list_instances")
# Call the instances_list tool
logger.info("Calling instances_list tool...")
list_instances_result = await session.call_tool("instances_list")
logger.info(f"List instances result: {list_instances_result}")
# Call the list_functions tool with the new HATEOAS API
logger.info("Calling list_functions tool...")
# Set the current instance to use for subsequent calls
logger.info(f"Setting current instance to port {GHYDRAMCP_TEST_PORT}...")
use_instance_result = await session.call_tool("instances_use", arguments={"port": GHYDRAMCP_TEST_PORT})
logger.info(f"Use instance result: {use_instance_result}")
# Call the functions_list tool (no port needed now)
logger.info("Calling functions_list tool...")
list_functions_result = await session.call_tool(
"list_functions",
arguments={"port": GHYDRAMCP_TEST_PORT, "offset": 0, "limit": 5}
"functions_list",
arguments={"offset": 0, "limit": 5} # No port needed
)
logger.info(f"List functions result: {list_functions_result}")
# Test the current program endpoint
logger.info("Calling get_program_info tool...")
current_program_result = await session.call_tool(
"get_program_info",
arguments={"port": GHYDRAMCP_TEST_PORT}
)
logger.info(f"Current program result: {current_program_result}")
# Test the current instance endpoint
logger.info("Calling instances_current tool...")
current_program_result = await session.call_tool("instances_current") # No args needed
logger.info(f"Current instance result: {current_program_result}")
# Add assertion for current instance result structure if needed
current_data = json.loads(current_program_result.content[0].text)
assert "port" in current_data, "Missing port in instances_current result"
assert "program_name" in current_data, "Missing program_name in instances_current result"
# Test mutating operations by changing and reverting
logger.info("Testing mutating operations...")
try:
# Get a known function to test with from list_functions result
# Get a known function to test with from functions_list result
list_funcs = await session.call_tool(
"list_functions",
arguments={"port": GHYDRAMCP_TEST_PORT, "offset": 0, "limit": 5}
"functions_list",
arguments={"offset": 0, "limit": 5} # No port needed
)
if not list_funcs or not list_funcs.content:
logger.warning("No functions found - skipping mutating tests")
logger.warning("No functions found via functions_list - skipping mutating tests")
return
# Parse the JSON response from list_functions using helper
# Parse the JSON response from functions_list using helper
try:
list_funcs_data = await assert_standard_mcp_success_response(list_funcs.content, expected_result_type=list)
func_list = list_funcs_data.get("result", [])
if not func_list:
logger.warning("No functions in list_functions result - skipping mutating tests")
logger.warning("No functions in functions_list result - skipping mutating tests")
return
# Get first function's name and address
# Ensure the item is a dictionary before accessing keys
first_func = func_list[0]
if not isinstance(first_func, dict):
logger.warning(f"First item in functions_list is not a dict: {first_func} - skipping mutating tests")
return
func_name = first_func.get("name", "")
func_address = first_func.get("address", "")
if not func_name or not func_address:
logger.warning("No function name/address found in list_functions result - skipping mutating tests")
logger.warning("No function name/address found in functions_list result - skipping mutating tests")
return
except AssertionError as e:
logger.warning(f"Error processing list_functions data: {e} - skipping mutating tests")
except (AssertionError, IndexError, TypeError) as e:
logger.warning(f"Error processing functions_list data: {e} - skipping mutating tests")
return
# Test function renaming
# Test function renaming using functions_rename
original_name = func_name
test_name = f"{func_name}_test"
test_name = f"{func_name}_test_mcp" # Make name slightly more unique
# Test successful rename operations using rename_function
rename_args = {"port": GHYDRAMCP_TEST_PORT, "name": original_name, "new_name": test_name}
logger.info(f"Calling rename_function with args: {rename_args}")
rename_result = await session.call_tool("rename_function", arguments=rename_args)
# Test successful rename operations using functions_rename (no port needed)
rename_args = {"old_name": original_name, "new_name": test_name} # Use old_name instead of name
logger.info(f"Calling functions_rename with args: {rename_args}")
rename_result = await session.call_tool("functions_rename", arguments=rename_args)
rename_data = json.loads(rename_result.content[0].text) # Parse simple response
assert rename_data.get("success") is True, f"Rename failed: {rename_data}"
logger.info(f"Rename result: {rename_result}")
# Verify rename by getting the function
renamed_func = await session.call_tool("get_function", arguments={"port": 8192, "name": test_name})
# Verify rename by getting the function using functions_get (no port needed)
renamed_func = await session.call_tool("functions_get", arguments={"name": test_name})
renamed_data = await assert_standard_mcp_success_response(renamed_func.content, expected_result_type=dict)
assert renamed_data.get("result", {}).get("name") == test_name, f"Renamed function has wrong name: {renamed_data}"
logger.info(f"Renamed function result: {renamed_func}")
# Rename back to original
revert_args = {"port": GHYDRAMCP_TEST_PORT, "name": test_name, "new_name": original_name}
logger.info(f"Calling rename_function with args: {revert_args}")
revert_result = await session.call_tool("rename_function", arguments=revert_args)
# Rename back to original using functions_rename (no port needed)
revert_args = {"old_name": test_name, "new_name": original_name} # Use old_name
logger.info(f"Calling functions_rename with args: {revert_args}")
revert_result = await session.call_tool("functions_rename", arguments=revert_args)
revert_data = json.loads(revert_result.content[0].text) # Parse simple response
assert revert_data.get("success") is True, f"Revert rename failed: {revert_data}"
logger.info(f"Revert rename result: {revert_result}")
# Verify revert by getting the function
original_func = await session.call_tool("get_function", arguments={"port": GHYDRAMCP_TEST_PORT, "name": original_name})
# Verify revert by getting the function using functions_get (no port needed)
original_func = await session.call_tool("functions_get", arguments={"name": original_name})
original_data = await assert_standard_mcp_success_response(original_func.content, expected_result_type=dict)
assert original_data.get("result", {}).get("name") == original_name, f"Original function has wrong name: {original_data}"
logger.info(f"Original function result: {original_func}")
# Test get_function with address parameter
logger.info(f"Calling get_function with address: {func_address}")
get_by_addr_result = await session.call_tool("get_function", arguments={"port": GHYDRAMCP_TEST_PORT, "address": func_address})
# Test functions_get with address parameter (no port needed)
logger.info(f"Calling functions_get with address: {func_address}")
get_by_addr_result = await session.call_tool("functions_get", arguments={"address": func_address})
get_by_addr_data = await assert_standard_mcp_success_response(get_by_addr_result.content, expected_result_type=dict)
result_data = get_by_addr_data.get("result", {})
assert "name" in result_data, "Missing name field in get_function result"
assert "address" in result_data, "Missing address field in get_function result"
assert "signature" in result_data, "Missing signature field in get_function result"
assert result_data.get("name") == original_name, f"Wrong name in get_function: {result_data.get('name')}"
assert "name" in result_data, "Missing name field in functions_get result"
assert "address" in result_data, "Missing address field in functions_get result"
assert "signature" in result_data, "Missing signature field in functions_get result"
assert result_data.get("name") == original_name, f"Wrong name in functions_get: {result_data.get('name')}"
logger.info(f"Get function by address result: {get_by_addr_result}")
# Test decompile_function
logger.info(f"Calling decompile_function with address: {func_address}")
decompile_result = await session.call_tool("decompile_function", arguments={"port": GHYDRAMCP_TEST_PORT, "address": func_address})
# Test functions_decompile (no port needed)
logger.info(f"Calling functions_decompile with address: {func_address}")
decompile_result = await session.call_tool("functions_decompile", arguments={"address": func_address})
decompile_data = await assert_standard_mcp_success_response(decompile_result.content, expected_result_type=dict)
# The decompiled code might be in different fields depending on version
has_decompiled = False
if "decompiled_code" in decompile_data:
has_decompiled = True
elif "decompiled_text" in decompile_data:
has_decompiled = True
elif "result" in decompile_data and isinstance(decompile_data["result"], dict):
result = decompile_data["result"]
if "ccode" in result or "decompiled" in result or "decompiled_text" in result:
has_decompiled = True
assert has_decompiled, f"Decompile result missing decompiled code: {decompile_data}"
# Check for decompiled code (bridge adds 'decompiled_code' field)
assert "decompiled_code" in decompile_data, f"Decompile result missing decompiled_code field: {decompile_data}"
assert isinstance(decompile_data["decompiled_code"], str), "decompiled_code should be a string"
logger.info(f"Decompile function result: {decompile_result}")
# Test disassemble_function
logger.info(f"Calling disassemble_function with address: {func_address}")
disassemble_result = await session.call_tool("disassemble_function", arguments={"port": GHYDRAMCP_TEST_PORT, "address": func_address})
# Test functions_disassemble (no port needed)
logger.info(f"Calling functions_disassemble with address: {func_address}")
disassemble_result = await session.call_tool("functions_disassemble", arguments={"address": func_address})
disassemble_data = json.loads(disassemble_result.content[0].text)
assert disassemble_data.get("success") is True, f"Disassemble failed: {disassemble_data}"
# Check for disassembly text in the simplified format
has_disassembly = False
if "disassembly" in disassemble_data:
has_disassembly = True
elif "result" in disassemble_data and isinstance(disassemble_data["result"], dict):
result = disassemble_data["result"]
if "disassembly_text" in result:
has_disassembly = True
elif "instructions" in result:
has_disassembly = True
# Check for disassembly text (bridge adds 'disassembly_text' field)
assert "result" in disassemble_data and isinstance(disassemble_data["result"], dict), "Disassembly result missing 'result' object"
result = disassemble_data["result"]
assert "disassembly_text" in result, f"Disassembly result missing disassembly_text field: {disassemble_data}"
assert isinstance(result["disassembly_text"], str), "disassembly_text should be a string"
assert has_disassembly, f"Disassembly result missing disassembly text: {disassemble_data}"
# Check additional function info
if "function_name" in disassemble_data:
assert isinstance(disassemble_data["function_name"], str), "function_name should be a string"
if "function_address" in disassemble_data:
assert isinstance(disassemble_data["function_address"], str), "function_address should be a string"
# Check additional function info if present
if "name" in result:
assert isinstance(result["name"], str), "function name should be a string"
if "address" in result:
assert isinstance(result["address"], str), "function address should be a string"
logger.info(f"Disassemble function result: {disassemble_result}")
# Test get_function_variables instead of list_variables
logger.info("Calling get_function_variables tool...")
function_vars_result = await session.call_tool("get_function_variables", arguments={"port": 8192, "address": func_address})
# Test functions_get_variables (no port needed)
logger.info("Calling functions_get_variables tool...")
function_vars_result = await session.call_tool("functions_get_variables", arguments={"address": func_address})
try:
vars_data = await assert_standard_mcp_success_response(function_vars_result.content, expected_result_type=dict)
if "result" in vars_data and isinstance(vars_data["result"], dict) and "variables" in vars_data["result"]:
@ -243,134 +238,84 @@ async def test_bridge():
if variables_list and len(variables_list) > 0:
for var in variables_list:
assert "name" in var, f"Variable missing name: {var}"
assert "dataType" in var, f"Variable missing dataType: {var}"
logger.info(f"Function variables result: {function_vars_result}")
else:
logger.info("Function variables available but no variables found in function.")
except (AssertionError, KeyError) as e:
logger.warning(f"Could not validate function variables: {e}")
# Test comment operations using set_comment
test_comment = "Test comment from MCP client"
comment_args = {"port": 8192, "address": func_address, "comment": test_comment, "comment_type": "plate"}
logger.info(f"Calling set_comment with args: {comment_args}")
comment_result = await session.call_tool("set_comment", arguments=comment_args)
comment_data = json.loads(comment_result.content[0].text)
assert comment_data.get("success") is True, f"Add comment failed: {comment_data}"
logger.info(f"Add comment result: {comment_result}")
# Remove comment
remove_comment_args = {"port": 8192, "address": func_address, "comment": "", "comment_type": "plate"}
logger.info(f"Calling set_comment with args: {remove_comment_args}")
remove_comment_result = await session.call_tool("set_comment", arguments=remove_comment_args)
remove_data = json.loads(remove_comment_result.content[0].text)
assert remove_data.get("success") is True, f"Remove comment failed: {remove_data}"
logger.info(f"Remove comment result: {remove_comment_result}")
# Test comments using set_decompiler_comment (which is a convenience wrapper for set_comment)
test_comment = "Test decompiler comment from MCP client"
decompiler_comment_args = {"port": 8192, "address": func_address, "comment": test_comment}
logger.info(f"Calling set_decompiler_comment with args: {decompiler_comment_args}")
decompiler_comment_result = await session.call_tool("set_decompiler_comment", arguments=decompiler_comment_args)
decompiler_comment_data = json.loads(decompiler_comment_result.content[0].text)
assert decompiler_comment_data.get("success") is True, f"Add decompiler comment failed: {decompiler_comment_data}"
logger.info(f"Add decompiler comment result: {decompiler_comment_result}")
# Remove decompiler comment
remove_decompiler_comment_args = {"port": 8192, "address": func_address, "comment": ""}
logger.info(f"Calling set_decompiler_comment with args: {remove_decompiler_comment_args}")
remove_decompiler_comment_result = await session.call_tool("set_decompiler_comment", arguments=remove_decompiler_comment_args)
remove_decompiler_data = json.loads(remove_decompiler_comment_result.content[0].text)
assert remove_decompiler_data.get("success") is True, f"Remove decompiler comment failed: {remove_decompiler_data}"
logger.info(f"Remove decompiler comment result: {remove_decompiler_comment_result}")
# REMOVED: Tests for set_comment and set_decompiler_comment as tools no longer exist
# Test expected failure cases
# Try to rename non-existent function
bad_rename_args = {"port": 8192, "name": "nonexistent_function", "new_name": "should_fail"}
logger.info(f"Calling rename_function with args: {bad_rename_args}")
# Try to rename non-existent function using functions_rename
bad_rename_args = {"old_name": "nonexistent_function_mcp", "new_name": "should_fail"}
logger.info(f"Calling functions_rename with args: {bad_rename_args}")
try:
bad_rename_result = await session.call_tool("rename_function", arguments=bad_rename_args)
bad_rename_result = await session.call_tool("functions_rename", arguments=bad_rename_args)
logger.info(f"Bad rename result: {bad_rename_result}") # Log the response
bad_rename_data = json.loads(bad_rename_result.content[0].text)
assert bad_rename_data.get("success") is False, f"Renaming non-existent function should fail, but got: {bad_rename_data}"
except Exception as e:
# It's also acceptable if the tool call itself fails, as long as it doesn't succeed
logger.info(f"Expected failure: rename_function properly rejected bad parameters: {e}")
logger.info(f"Expected failure: functions_rename properly rejected bad parameters: {e}")
# Try to get non-existent function
# Try to get non-existent function using functions_get
bad_get_result = await session.call_tool(
"get_function",
arguments={"port": 8192, "name": "nonexistent_function"}
"functions_get",
arguments={"name": "nonexistent_function_mcp"}
)
logger.info(f"Bad get result: {bad_get_result}") # Log the response
bad_get_data = json.loads(bad_get_result.content[0].text)
assert bad_get_data.get("success") is False, f"Getting non-existent function should fail, but got: {bad_get_data}"
# Try to comment on invalid address
bad_comment_args = {"port": 8192, "address": "0xinvalid", "comment": "should fail", "comment_type": "plate"}
logger.info(f"Calling set_comment with args: {bad_comment_args}")
bad_comment_result = await session.call_tool("set_comment", arguments=bad_comment_args)
bad_comment_data = json.loads(bad_comment_result.content[0].text)
assert bad_comment_data.get("success") is False, "Commenting on invalid address should fail"
# REMOVED: Test for commenting on invalid address as set_comment tool no longer exists
# REMOVED: Tests for get_current_address and get_current_function as tools no longer exist
# Test get_current_address
logger.info("Calling get_current_address tool...")
current_addr_result = await session.call_tool("get_current_address", arguments={"port": 8192})
current_addr_data = await assert_standard_mcp_success_response(current_addr_result.content, expected_result_type=dict)
assert "address" in current_addr_data.get("result", {}), "Missing address in get_current_address result"
assert isinstance(current_addr_data.get("result", {}).get("address", ""), str), "Address should be a string"
logger.info(f"Get current address result: {current_addr_result}")
# Test get_current_function
logger.info("Calling get_current_function tool...")
current_func_result = await session.call_tool("get_current_function", arguments={"port": 8192})
current_func_data = await assert_standard_mcp_success_response(current_func_result.content, expected_result_type=dict)
result_data = current_func_data.get("result", {})
assert "name" in result_data, "Missing name in get_current_function result"
assert "address" in result_data, "Missing address in get_current_function result"
assert "signature" in result_data, "Missing signature in get_current_function result"
logger.info(f"Get current function result: {current_func_result}")
# Test read_memory functionality
logger.info(f"Calling read_memory with address: {func_address}")
read_memory_result = await session.call_tool("read_memory", arguments={"port": 8192, "address": func_address, "length": 16})
# Test memory_read functionality (no port needed)
logger.info(f"Calling memory_read with address: {func_address}")
read_memory_result = await session.call_tool("memory_read", arguments={"address": func_address, "length": 16})
read_memory_data = json.loads(read_memory_result.content[0].text)
assert read_memory_data.get("success") is True, f"Read memory failed: {read_memory_data}"
assert "hexBytes" in read_memory_data, "Missing hexBytes in read_memory result"
assert "rawBytes" in read_memory_data, "Missing rawBytes in read_memory result"
assert read_memory_data.get("address") == func_address, f"Wrong address in read_memory result: {read_memory_data.get('address')}"
# Bridge simplification puts data directly in response
assert "hexBytes" in read_memory_data, "Missing hexBytes in memory_read result"
assert "rawBytes" in read_memory_data, "Missing rawBytes in memory_read result"
assert read_memory_data.get("address") == func_address, f"Wrong address in memory_read result: {read_memory_data.get('address')}"
logger.info(f"Read memory result: {read_memory_result}")
# Test data operations (create, rename, change type)
# Test data operations (create, rename, change type, delete) using namespaced tools
logger.info("Testing data operations...")
try:
# Get a memory address to create test data
data_address = func_address
original_data_type = "undefined" # Placeholder, might not exist initially
# First create test data
create_data_args = {"port": 8192, "address": data_address, "data_type": "uint32_t"}
logger.info(f"Calling create_data with args: {create_data_args}")
create_data_result = await session.call_tool("create_data", arguments=create_data_args)
# First create test data using data_create (no port needed)
create_data_args = {"address": data_address, "data_type": "uint32_t"}
logger.info(f"Calling data_create with args: {create_data_args}")
create_data_result = await session.call_tool("data_create", arguments=create_data_args)
create_data_response = json.loads(create_data_result.content[0].text)
assert create_data_response.get("success") is True, f"Create data failed: {create_data_response}"
logger.info(f"Create data result: {create_data_result}")
original_data_type = "uint32_t" # Update original type
# Test Case 1: Data rename operation (name only)
test_data_name = "test_data_item"
rename_data_args = {"port": 8192, "address": data_address, "name": test_data_name}
logger.info(f"Calling rename_data with args: {rename_data_args}")
rename_data_result = await session.call_tool("rename_data", arguments=rename_data_args)
# Test Case 1: Data rename operation using data_rename (no port needed)
test_data_name = "test_data_item_mcp"
rename_data_args = {"address": data_address, "name": test_data_name}
logger.info(f"Calling data_rename with args: {rename_data_args}")
rename_data_result = await session.call_tool("data_rename", arguments=rename_data_args)
rename_data_response = json.loads(rename_data_result.content[0].text)
assert rename_data_response.get("success") is True, f"Rename data failed: {rename_data_response}"
logger.info(f"Rename data result: {rename_data_result}")
# Verify the name was changed
# Verify the name was changed (check the result field)
if rename_data_response.get("result", {}).get("name") != test_data_name:
logger.warning(f"Rename operation didn't set the expected name. Got: {rename_data_response.get('result', {}).get('name')}")
# Test Case 2: Data type change operation (type only)
change_type_args = {"port": 8192, "address": data_address, "data_type": "int"}
logger.info(f"Calling set_data_type with args: {change_type_args}")
change_type_result = await session.call_tool("set_data_type", arguments=change_type_args)
# Test Case 2: Data type change operation using data_set_type (no port needed)
change_type_args = {"address": data_address, "data_type": "int"}
logger.info(f"Calling data_set_type with args: {change_type_args}")
change_type_result = await session.call_tool("data_set_type", arguments=change_type_args)
change_type_response = json.loads(change_type_result.content[0].text)
assert change_type_response.get("success") is True, f"Change data type failed: {change_type_response}"
logger.info(f"Change data type result: {change_type_result}")
@ -382,44 +327,35 @@ async def test_bridge():
if result.get("name") != test_data_name:
logger.warning(f"Type change operation didn't preserve the name. Expected: {test_data_name}, Got: {result.get('name')}")
# Test Case 3: Combined update operation (both name and type)
update_data_args = {
"port": 8192,
"address": data_address,
"name": "updated_data_item",
"data_type": "byte"
}
logger.info(f"Calling update_data with args: {update_data_args}")
update_data_result = await session.call_tool("update_data", arguments=update_data_args)
update_data_response = json.loads(update_data_result.content[0].text)
assert update_data_response.get("success") is True, f"Update data failed: {update_data_response}"
logger.info(f"Update data result: {update_data_result}")
# REMOVED: Test Case 3 (Combined update) as update_data tool no longer exists
# Verify both name and type were changed
result = update_data_response.get("result", {})
if result.get("name") != "updated_data_item":
logger.warning(f"Update operation didn't set the expected name. Got: {result.get('name')}")
if result.get("dataType") != "byte":
logger.warning(f"Update operation didn't set the expected type. Got: {result.get('dataType')}")
# Clean up by restoring original data type
restore_type_args = {"port": 8192, "address": data_address, "data_type": "uint32_t"}
logger.info(f"Restoring data type with args: {restore_type_args}")
restore_type_result = await session.call_tool("set_data_type", arguments=restore_type_args)
restore_type_response = json.loads(restore_type_result.content[0].text)
assert restore_type_response.get("success") is True, f"Restore data type failed: {restore_type_response}"
# Clean up by deleting the created data using data_delete
delete_data_args = {"address": data_address}
logger.info(f"Deleting data with args: {delete_data_args}")
delete_data_result = await session.call_tool("data_delete", arguments=delete_data_args)
delete_data_response = json.loads(delete_data_result.content[0].text)
assert delete_data_response.get("success") is True, f"Delete data failed: {delete_data_response}"
logger.info(f"Delete data result: {delete_data_result}")
except Exception as e:
logger.warning(f"Error testing data operations: {e} - This is not critical")
# Test callgraph functionality - handle possible failure gracefully
if func_address:
logger.info(f"Calling get_callgraph with address: {func_address}")
logger.warning(f"Error testing data operations: {e} - This is not critical. Attempting cleanup.")
# Attempt cleanup even if tests failed mid-way
try:
callgraph_result = await session.call_tool("get_callgraph", arguments={"port": 8192, "address": func_address})
delete_data_args = {"address": data_address}
await session.call_tool("data_delete", arguments=delete_data_args)
logger.info("Data cleanup attempted.")
except Exception as cleanup_e:
logger.error(f"Data cleanup failed: {cleanup_e}")
# Test callgraph functionality using analysis_get_callgraph (no port needed)
if func_address:
logger.info(f"Calling analysis_get_callgraph with address: {func_address}")
try:
callgraph_result = await session.call_tool("analysis_get_callgraph", arguments={"function": func_address})
callgraph_data = json.loads(callgraph_result.content[0].text)
if callgraph_data.get("success"):
assert "result" in callgraph_data, "Missing result in get_callgraph response"
assert "result" in callgraph_data, "Missing result in analysis_get_callgraph response"
# The result could be either a dict with nodes/edges or a direct graph representation
logger.info(f"Get callgraph result: successful")
else:
@ -428,11 +364,11 @@ async def test_bridge():
except Exception as e:
logger.warning(f"Error in callgraph test: {e} - This is not critical")
# Test function signature operations
# Test function signature operations using functions_set_signature
logger.info("Testing function signature operations...")
try:
# Get current signature
get_func_for_sig = await session.call_tool("get_function", arguments={"port": 8192, "address": func_address})
# Get current signature using functions_get
get_func_for_sig = await session.call_tool("functions_get", arguments={"address": func_address})
get_func_for_sig_data = await assert_standard_mcp_success_response(get_func_for_sig.content, expected_result_type=dict)
original_signature = get_func_for_sig_data.get("result", {}).get("signature", "")
@ -440,38 +376,36 @@ async def test_bridge():
logger.warning("Could not get original signature - skipping signature test")
else:
# Create test signature by adding parameters
modified_signature = f"int {func_name}(uint32_t *data, int block_count, uint32_t *key)"
modified_signature = f"int {func_name}(uint32_t *mcp_data, int mcp_count, uint32_t *mcp_key)"
logger.info(f"Original signature: {original_signature}")
logger.info(f"Setting function signature to: {modified_signature}")
# Set new signature
set_sig_result = await session.call_tool("set_function_signature",
arguments={"port": 8192,
"address": func_address,
# Set new signature using functions_set_signature (no port needed)
set_sig_result = await session.call_tool("functions_set_signature",
arguments={"address": func_address,
"signature": modified_signature})
set_sig_data = json.loads(set_sig_result.content[0].text)
assert set_sig_data.get("success") is True, f"Set signature failed: {set_sig_data}"
logger.info(f"Set signature result: {set_sig_result}")
# Verify the change
verify_sig_result = await session.call_tool("get_function", arguments={"port": 8192, "address": func_address})
# Verify the change using functions_get
verify_sig_result = await session.call_tool("functions_get", arguments={"address": func_address})
verify_sig_data = await assert_standard_mcp_success_response(verify_sig_result.content, expected_result_type=dict)
new_signature = verify_sig_data.get("result", {}).get("signature", "")
assert "uint32_t *data" in new_signature, f"Signature not properly updated: {new_signature}"
assert "uint32_t *mcp_data" in new_signature, f"Signature not properly updated: {new_signature}"
logger.info(f"Updated signature: {new_signature}")
# Restore original signature
# Restore original signature using functions_set_signature
logger.info(f"Restoring original signature: {original_signature}")
restore_sig_result = await session.call_tool("set_function_signature",
arguments={"port": 8192,
"address": func_address,
restore_sig_result = await session.call_tool("functions_set_signature",
arguments={"address": func_address,
"signature": original_signature})
restore_sig_data = json.loads(restore_sig_result.content[0].text)
assert restore_sig_data.get("success") is True, f"Restore signature failed: {restore_sig_data}"
logger.info(f"Restore signature result: {restore_sig_result}")
# Verify restoration
final_func_result = await session.call_tool("get_function", arguments={"port": 8192, "address": func_address})
# Verify restoration using functions_get
final_func_result = await session.call_tool("functions_get", arguments={"address": func_address})
final_func_data = await assert_standard_mcp_success_response(final_func_result.content, expected_result_type=dict)
final_signature = final_func_data.get("result", {}).get("signature", "")
assert final_signature == original_signature, f"Signature not properly restored: {final_signature}"
@ -480,7 +414,7 @@ async def test_bridge():
logger.warning(f"Error in signature test: {e} - This is not critical")
except Exception as e:
logger.error(f"Error testing mutating operations: {e}")
logger.error(f"Error testing mutating operations: {e}", exc_info=True)
raise
def main():