diff --git a/CHANGELOG.md b/CHANGELOG.md index 91a3065..77115e4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,33 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/). ## [Unreleased] +### Added +- **Progress Reporting for Long Operations:** 7 MCP prompts now report real-time progress during multi-step scanning operations: + - `malware_triage` - Reports progress across 21 scanning steps + - `analyze_imports` - Reports progress across 12 capability categories + - `identify_crypto` - Reports progress across 20 pattern scans + - `find_authentication` - Reports progress across 30 auth pattern scans + - `find_main_logic` - Reports progress across 22 entry point searches + - `find_error_handlers` - Reports progress across 35 error pattern scans + - `find_config_parsing` - Reports progress across 23 config pattern scans + - Uses FastMCP's `Context.report_progress()` for numeric progress updates + - Uses `Context.info()` for descriptive step notifications + - Helper functions `report_step()` and `report_progress()` for consistent reporting +- **Specialized Analysis Prompts:** 13 new MCP prompts for common reverse engineering workflows: + - `analyze_strings` - String analysis with categorization and cross-reference guidance + - `trace_data_flow` - Data flow and taint analysis through functions + - `identify_crypto` - Cryptographic function and constant identification + - `malware_triage` - Quick malware analysis with capability assessment checklist + - `analyze_protocol` - Network/file protocol reverse engineering framework + - `find_main_logic` - Navigate past CRT initialization to find actual program logic + - `analyze_imports` - Categorize imports by capability with suspicious pattern detection + - `find_authentication` - Locate auth, license checks, and credential handling code + - `analyze_switch_table` - Reverse engineer command dispatchers and jump tables + - `find_config_parsing` - Identify configuration file parsing and settings management + - `compare_functions` - Compare two functions for similarity (patches, variants, libraries) + - `document_struct` - Comprehensively document data structure fields and usage + - `find_error_handlers` - Map error handling, cleanup routines, and exit paths + ## [2025.12.1] - 2025-12-01 ### Added @@ -22,14 +49,14 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/). - `cursor_delete(cursor_id)` - Delete specific cursor - `cursor_delete_all()` - Delete all session cursors - **Enumeration Resources:** New lightweight MCP resources for quick data enumeration (more efficient than tool calls): - - `/instances` - List all active Ghidra instances - - `/instance/{port}/summary` - Program overview with statistics - - `/instance/{port}/functions` - List functions (capped at 1000) - - `/instance/{port}/strings` - List strings (capped at 500) - - `/instance/{port}/data` - List data items (capped at 1000) - - `/instance/{port}/structs` - List struct types (capped at 500) - - `/instance/{port}/xrefs/to/{address}` - Cross-references to an address - - `/instance/{port}/xrefs/from/{address}` - Cross-references from an address + - `ghidra://instances` - List all active Ghidra instances + - `ghidra://instance/{port}/summary` - Program overview with statistics + - `ghidra://instance/{port}/functions` - List functions (capped at 1000) + - `ghidra://instance/{port}/strings` - List strings (capped at 500) + - `ghidra://instance/{port}/data` - List data items (capped at 1000) + - `ghidra://instance/{port}/structs` - List struct types (capped at 500) + - `ghidra://instance/{port}/xrefs/to/{address}` - Cross-references to an address + - `ghidra://instance/{port}/xrefs/from/{address}` - Cross-references from an address ### Changed - **MCP Dependency Upgrade:** Updated from `mcp==1.6.0` to `mcp>=1.22.0` for FastMCP Context support. diff --git a/bridge_mcp_hydra.py b/bridge_mcp_hydra.py index bc6401e..a5c1eac 100644 --- a/bridge_mcp_hydra.py +++ b/bridge_mcp_hydra.py @@ -61,6 +61,126 @@ MAX_GREP_REPETITION_OPS = 15 # Maximum repetition operators (* + ? {}) MAX_GREP_RECURSION_DEPTH = 10 # Maximum depth for nested data grep matching +# ================= Progress Reporting System ================= +# Provides async progress updates for long-running operations + +import asyncio +from contextlib import asynccontextmanager + + +class ProgressReporter: + """Helper class for reporting progress during long operations. + + Wraps FastMCP's context to provide convenient progress reporting with + automatic handling of sync vs async contexts. + + Usage: + progress = ProgressReporter(ctx, "Loading functions", total=1000) + for i, item in enumerate(items): + await progress.update(i + 1) + await progress.complete("Loaded {count} functions") + """ + + def __init__(self, ctx: Context, operation: str, total: int = 100): + """Initialize progress reporter. + + Args: + ctx: FastMCP Context (can be None for sync operations) + operation: Human-readable description of the operation + total: Total expected items/steps (default: 100 for percentage) + """ + self.ctx = ctx + self.operation = operation + self.total = total + self.current = 0 + self._last_reported = 0 + self._report_threshold = max(1, total // 20) # Report every 5% at minimum + + async def update(self, progress: int = None, message: str = None): + """Update progress, reporting to client if threshold reached. + + Args: + progress: Current progress value (if None, increments by 1) + message: Optional status message to include + """ + if progress is not None: + self.current = progress + else: + self.current += 1 + + # Only report if we've passed the threshold (avoid spamming) + if self.ctx and (self.current - self._last_reported >= self._report_threshold + or self.current >= self.total): + try: + await self.ctx.report_progress( + progress=self.current, + total=self.total + ) + if message: + await self.ctx.info(f"{self.operation}: {message}") + self._last_reported = self.current + except Exception: + pass # Ignore progress reporting errors + + async def info(self, message: str): + """Send an info message to the client.""" + if self.ctx: + try: + await self.ctx.info(f"{self.operation}: {message}") + except Exception: + pass + + async def complete(self, message: str = None): + """Mark operation as complete.""" + self.current = self.total + if self.ctx: + try: + await self.ctx.report_progress(progress=self.total, total=self.total) + if message: + await self.ctx.info(message.format( + count=self.current, + total=self.total, + operation=self.operation + )) + except Exception: + pass + + +async def report_progress(ctx: Context, progress: int, total: int, message: str = None): + """Convenience function for one-off progress updates. + + Args: + ctx: FastMCP Context + progress: Current progress value + total: Total expected value + message: Optional status message + """ + if ctx: + try: + await ctx.report_progress(progress=progress, total=total) + if message: + await ctx.info(message) + except Exception: + pass + + +async def report_step(ctx: Context, step: int, total_steps: int, description: str): + """Report a discrete step in a multi-step operation. + + Args: + ctx: FastMCP Context + step: Current step number (1-indexed) + total_steps: Total number of steps + description: What this step is doing + """ + if ctx: + try: + await ctx.report_progress(progress=step, total=total_steps) + await ctx.info(f"Step {step}/{total_steps}: {description}") + except Exception: + pass + + def compile_safe_pattern(pattern: str, flags: int = 0) -> re.Pattern: """Compile regex pattern with ReDoS protection @@ -1154,7 +1274,7 @@ def handle_sigint(signum, frame): # Resources provide information that can be loaded directly into context # They focus on data and minimize metadata -@mcp.resource(uri="/instance/{port}") +@mcp.resource(uri="ghidra://instance/{port}") def ghidra_instance(port: int = None) -> dict: """Get detailed information about a Ghidra instance and the loaded program @@ -1201,7 +1321,7 @@ def ghidra_instance(port: int = None) -> dict: return instance_info -@mcp.resource(uri="/instance/{port}/function/decompile/address/{address}") +@mcp.resource(uri="ghidra://instance/{port}/function/decompile/address/{address}") def decompiled_function_by_address(port: int = None, address: str = None) -> str: """Get decompiled C code for a function by address @@ -1250,7 +1370,7 @@ def decompiled_function_by_address(port: int = None, address: str = None) -> str return "Error: Could not extract decompiled code from response" -@mcp.resource(uri="/instance/{port}/function/decompile/name/{name}") +@mcp.resource(uri="ghidra://instance/{port}/function/decompile/name/{name}") def decompiled_function_by_name(port: int = None, name: str = None) -> str: """Get decompiled C code for a function by name @@ -1299,7 +1419,7 @@ def decompiled_function_by_name(port: int = None, name: str = None) -> str: return "Error: Could not extract decompiled code from response" -@mcp.resource(uri="/instance/{port}/function/info/address/{address}") +@mcp.resource(uri="ghidra://instance/{port}/function/info/address/{address}") def function_info_by_address(port: int = None, address: str = None) -> dict: """Get detailed information about a function by address @@ -1343,7 +1463,7 @@ def function_info_by_address(port: int = None, address: str = None) -> dict: # Return just the function data without API metadata return simplified["result"] -@mcp.resource(uri="/instance/{port}/function/info/name/{name}") +@mcp.resource(uri="ghidra://instance/{port}/function/info/name/{name}") def function_info_by_name(port: int = None, name: str = None) -> dict: """Get detailed information about a function by name @@ -1387,7 +1507,7 @@ def function_info_by_name(port: int = None, name: str = None) -> dict: # Return just the function data without API metadata return simplified["result"] -@mcp.resource(uri="/instance/{port}/function/disassembly/address/{address}") +@mcp.resource(uri="ghidra://instance/{port}/function/disassembly/address/{address}") def disassembly_by_address(port: int = None, address: str = None) -> str: """Get disassembled instructions for a function by address @@ -1447,7 +1567,7 @@ def disassembly_by_address(port: int = None, address: str = None) -> str: return "Error: Could not extract disassembly from response" -@mcp.resource(uri="/instance/{port}/function/disassembly/name/{name}") +@mcp.resource(uri="ghidra://instance/{port}/function/disassembly/name/{name}") def disassembly_by_name(port: int = None, name: str = None) -> str: """Get disassembled instructions for a function by name @@ -1512,7 +1632,7 @@ def disassembly_by_name(port: int = None, name: str = None) -> str: # Lightweight read-only resources for listing/enumerating Ghidra data # More efficient than tool calls for simple data access -@mcp.resource(uri="/instances") +@mcp.resource(uri="ghidra://instances") def resource_instances_list() -> dict: """List all active Ghidra instances @@ -1544,7 +1664,7 @@ def resource_instances_list() -> dict: } -@mcp.resource(uri="/instance/{port}/functions") +@mcp.resource(uri="ghidra://instance/{port}/functions") def resource_functions_list(port: int = None) -> dict: """List all functions in the program (lightweight enumeration) @@ -1589,7 +1709,7 @@ def resource_functions_list(port: int = None) -> dict: } -@mcp.resource(uri="/instance/{port}/strings") +@mcp.resource(uri="ghidra://instance/{port}/strings") def resource_strings_list(port: int = None) -> dict: """List defined strings in the program (lightweight enumeration) @@ -1633,7 +1753,7 @@ def resource_strings_list(port: int = None) -> dict: } -@mcp.resource(uri="/instance/{port}/data") +@mcp.resource(uri="ghidra://instance/{port}/data") def resource_data_list(port: int = None) -> dict: """List defined data items in the program (lightweight enumeration) @@ -1677,7 +1797,7 @@ def resource_data_list(port: int = None) -> dict: } -@mcp.resource(uri="/instance/{port}/structs") +@mcp.resource(uri="ghidra://instance/{port}/structs") def resource_structs_list(port: int = None) -> dict: """List defined struct types in the program (lightweight enumeration) @@ -1721,7 +1841,7 @@ def resource_structs_list(port: int = None) -> dict: } -@mcp.resource(uri="/instance/{port}/xrefs/to/{address}") +@mcp.resource(uri="ghidra://instance/{port}/xrefs/to/{address}") def resource_xrefs_to(port: int = None, address: str = None) -> dict: """List cross-references TO an address (lightweight enumeration) @@ -1770,7 +1890,7 @@ def resource_xrefs_to(port: int = None, address: str = None) -> dict: } -@mcp.resource(uri="/instance/{port}/xrefs/from/{address}") +@mcp.resource(uri="ghidra://instance/{port}/xrefs/from/{address}") def resource_xrefs_from(port: int = None, address: str = None) -> dict: """List cross-references FROM an address (lightweight enumeration) @@ -1819,7 +1939,7 @@ def resource_xrefs_from(port: int = None, address: str = None) -> dict: } -@mcp.resource(uri="/instance/{port}/summary") +@mcp.resource(uri="ghidra://instance/{port}/summary") def resource_program_summary(port: int = None) -> dict: """Get a comprehensive summary of the loaded program @@ -2089,7 +2209,2578 @@ def reverse_engineer_binary_prompt(port: int = None): - Use data_* tools to work with program data """, "context": { - "program_info": program_info + "program_info": program_info + } + } + +@mcp.prompt("analyze_strings") +def analyze_strings_prompt(port: int = None, pattern: str = None): + """A prompt to analyze string references in the binary + + Useful for finding hardcoded paths, URLs, error messages, and other interesting strings. + + Args: + port: Specific Ghidra instance port (optional) + pattern: Optional grep pattern to filter strings (e.g., "http", "password", "error") + """ + port = _get_instance_port(port) + + # Get strings from the binary + strings_result = data_list_strings(port=port, page_size=100, grep=pattern, grep_ignorecase=True) + + strings_list = [] + if isinstance(strings_result, dict): + strings_list = strings_result.get("strings", strings_result.get("items", [])) + + # Format strings for display + strings_display = "\n".join([ + f" {s.get('address', 'N/A')}: {s.get('value', s.get('string', str(s)))[:80]}" + for s in strings_list[:50] + ]) if strings_list else "No strings found matching criteria" + + filter_note = f" matching '{pattern}'" if pattern else "" + + return { + "prompt": f""" + # String Analysis for Binary + + Analyze the following strings{filter_note} found in the binary: + + ``` +{strings_display} + ``` + + Total strings shown: {len(strings_list[:50])} of {len(strings_list)} + + ## Analysis Tasks: + + 1. **Categorize Strings**: Group strings by type: + - File paths and system locations + - URLs and network addresses + - Error messages and debug strings + - Format strings (printf-style) + - Cryptographic constants or keys + - Configuration values + - User-visible messages + + 2. **Identify Interesting Patterns**: + - Look for hardcoded credentials or API keys + - Find debug/logging messages that reveal functionality + - Locate error handlers and their messages + - Identify protocol-related strings + + 3. **Cross-Reference Analysis**: + - For interesting strings, use xrefs_list to find where they're used + - Trace back to understand the context of usage + + 4. **Security Implications**: + - Note any strings that suggest security features + - Identify potential information disclosure + - Look for authentication/authorization related strings + + ## Recommended Follow-up Tools: + - `xrefs_list(address="")` - Find code using a string + - `functions_decompile(address="")` - Analyze functions using interesting strings + - `data_list_strings(grep="")` - Search for more specific patterns + """, + "context": { + "strings_count": len(strings_list), + "filter_pattern": pattern, + "sample_strings": strings_list[:20] + } + } + +@mcp.prompt("trace_data_flow") +def trace_data_flow_prompt(name: str = None, address: str = None, port: int = None): + """A prompt to trace data flow through a function + + Analyzes how data moves through a function, tracking inputs to outputs. + + Args: + name: Function name (mutually exclusive with address) + address: Function address in hex format + port: Specific Ghidra instance port (optional) + """ + port = _get_instance_port(port) + + # Get function info + fn_info = None + decompiled = "" + variables = [] + dataflow = [] + + if address: + fn_info = function_info_by_address(address=address, port=port) + decompiled = decompiled_function_by_address(address=address, port=port) + vars_result = function_variables_by_address(address=address, port=port) + dataflow_result = analysis_get_dataflow(address=address, port=port, page_size=50) + elif name: + fn_info = function_info_by_name(name=name, port=port) + decompiled = decompiled_function_by_name(name=name, port=port) + vars_result = function_variables_by_name(name=name, port=port) + # Get address for dataflow + if isinstance(fn_info, dict) and "entry_point" in fn_info: + dataflow_result = analysis_get_dataflow(address=fn_info["entry_point"], port=port, page_size=50) + else: + dataflow_result = {} + else: + return {"prompt": "Error: Must provide either name or address", "context": {}} + + if isinstance(vars_result, dict): + variables = vars_result.get("variables", []) + + if isinstance(dataflow_result, dict): + dataflow = dataflow_result.get("dataflow", dataflow_result.get("items", [])) + + # Format variables + vars_display = "\n".join([ + f" {v.get('name', 'N/A')}: {v.get('type', 'unknown')} ({v.get('storage', 'N/A')})" + for v in variables[:20] + ]) if variables else "No variables found" + + func_name = name or address + if isinstance(fn_info, dict): + func_name = fn_info.get("name", func_name) + + return { + "prompt": f""" + # Data Flow Analysis: {func_name} + + ## Decompiled Code: + ```c +{decompiled} + ``` + + ## Variables: + ``` +{vars_display} + ``` + + ## Analysis Tasks: + + 1. **Input Identification**: + - Identify all function parameters and their types + - Find global variables accessed by this function + - Locate any data read from external sources (files, network, etc.) + + 2. **Data Transformation Tracking**: + - Trace how input data is modified through the function + - Identify any encoding/decoding operations + - Note arithmetic or bitwise operations on data + - Track buffer copies and string manipulations + + 3. **Output Analysis**: + - Identify return values and their sources + - Find any output parameters (pointers modified) + - Locate data written to external destinations + + 4. **Taint Analysis**: + - Mark user-controlled inputs as "tainted" + - Trace tainted data through the function + - Identify if tainted data reaches sensitive operations: + * Memory allocation sizes + * Array indices + * Format strings + * System calls + * Cryptographic functions + + 5. **Data Dependencies**: + - Map dependencies between variables + - Identify critical paths where data must be validated + - Note any sanitization or validation routines + + ## Security Focus: + - Does user input reach memory operations without bounds checking? + - Is data properly validated before use in sensitive contexts? + - Are there any type confusions or integer issues? + + ## Recommended Follow-up: + - `analysis_get_dataflow(address="...")` - Get detailed dataflow graph + - `xrefs_list(address="...")` - Find callers to understand input sources + - `analysis_get_callgraph(address="...")` - See what this function calls + """, + "context": { + "function_info": fn_info, + "variables": variables, + "dataflow_sample": dataflow[:10] + } + } + +@mcp.prompt("identify_crypto") +async def identify_crypto_prompt(port: int = None, ctx: Context = None): + """A prompt to identify cryptographic functions and constants in the binary + + Searches for crypto-related patterns, constants, and function signatures. + Reports progress during multi-pattern scanning. + + Args: + port: Specific Ghidra instance port (optional) + ctx: FastMCP context for progress reporting (auto-injected) + """ + port = _get_instance_port(port) + + # Search for common crypto-related function names (15 patterns + 5 strings = 20 steps) + crypto_patterns = [ + "crypt", "cipher", "aes", "des", "rsa", "sha", "md5", "hash", + "encrypt", "decrypt", "key", "ssl", "tls", "hmac", "pbkdf" + ] + total_steps = len(crypto_patterns) + 5 # function patterns + string patterns + + found_functions = [] + for idx, pattern in enumerate(crypto_patterns, start=1): + await report_step(ctx, idx, total_steps, f"Scanning functions for '{pattern}'") + funcs_result = functions_list(port=port, grep=pattern, grep_ignorecase=True, page_size=20) + if isinstance(funcs_result, dict): + funcs = funcs_result.get("functions", funcs_result.get("items", [])) + found_functions.extend(funcs) + + # Remove duplicates (by address) + seen_addrs = set() + unique_funcs = [] + for f in found_functions: + addr = f.get("address", f.get("entry_point", str(f))) + if addr not in seen_addrs: + seen_addrs.add(addr) + unique_funcs.append(f) + + # Search for crypto-related strings + crypto_strings = [] + string_patterns = ["BEGIN.*KEY", "-----", "AES", "RSA", "SHA"] + for idx, pattern in enumerate(string_patterns, start=len(crypto_patterns) + 1): + await report_step(ctx, idx, total_steps, f"Scanning strings for '{pattern}'") + strings_result = data_list_strings(port=port, grep=pattern, grep_ignorecase=True, page_size=10) + if isinstance(strings_result, dict): + strs = strings_result.get("strings", strings_result.get("items", [])) + crypto_strings.extend(strs) + + # Format output + funcs_display = "\n".join([ + f" {f.get('address', f.get('entry_point', 'N/A'))}: {f.get('name', 'unknown')}" + for f in unique_funcs[:30] + ]) if unique_funcs else "No obvious crypto functions found by name" + + strings_display = "\n".join([ + f" {s.get('address', 'N/A')}: {str(s.get('value', s.get('string', s)))[:60]}" + for s in crypto_strings[:20] + ]) if crypto_strings else "No obvious crypto strings found" + + return { + "prompt": f""" + # Cryptographic Analysis + + ## Potentially Crypto-Related Functions: + ``` +{funcs_display} + ``` + + ## Potentially Crypto-Related Strings: + ``` +{strings_display} + ``` + + ## Analysis Tasks: + + 1. **Identify Crypto Libraries**: + - Look for OpenSSL, mbedTLS, wolfSSL, or other library signatures + - Check for statically linked crypto code + - Identify any custom implementations + + 2. **Algorithm Identification**: + - **Symmetric**: AES, DES, 3DES, ChaCha20, RC4, Blowfish + - **Asymmetric**: RSA, ECC, DH, DSA + - **Hash**: SHA-1/256/512, MD5, BLAKE2 + - **MAC**: HMAC, CMAC, Poly1305 + - **KDF**: PBKDF2, scrypt, Argon2 + + 3. **Constant Analysis**: + Look for these magic constants: + - AES S-box: 0x63, 0x7c, 0x77, 0x7b... + - SHA-256 init: 0x6a09e667, 0xbb67ae85... + - MD5 init: 0x67452301, 0xefcdab89... + - RSA public exponent: 0x10001 (65537) + + 4. **Key Handling**: + - How are keys generated or derived? + - Where are keys stored? + - Are keys properly protected in memory? + - Is there key rotation or expiration? + + 5. **Implementation Review**: + - Check for weak algorithms (MD5, SHA-1, RC4, DES) + - Look for ECB mode usage (insecure for most cases) + - Verify IV/nonce handling (should be random/unique) + - Check for hardcoded keys or IVs + + 6. **Security Concerns**: + - Timing side-channels in comparisons + - Insufficient key lengths + - Poor random number generation + - Key material in logs or error messages + + ## Recommended Follow-up: + - Decompile identified crypto functions for detailed analysis + - Check xrefs to understand where crypto is used + - Look for key generation/storage functions + - Search for random number generation (rand, /dev/urandom, etc.) + """, + "context": { + "crypto_functions": unique_funcs[:20], + "crypto_strings": crypto_strings[:10], + "function_count": len(unique_funcs) + } + } + +@mcp.prompt("malware_triage") +async def malware_triage_prompt(port: int = None, ctx: Context = None): + """A prompt for quick malware triage and analysis + + Provides a structured approach to initial malware analysis. + Reports progress during multi-step data gathering. + + Args: + port: Specific Ghidra instance port (optional) + ctx: FastMCP context for progress reporting (auto-injected) + """ + port = _get_instance_port(port) + + # Total steps: 1 (program info) + 6 (entry points) + 8 (strings) + 6 (functions) = 21 + total_steps = 21 + current_step = 0 + + # Get program info + await report_step(ctx, 1, total_steps, "Getting program info") + program_info = ghidra_instance(port=port) + current_step = 1 + + # Get entry points and main functions + main_funcs = [] + entry_names = ["main", "_main", "WinMain", "DllMain", "start", "_start"] + for i, name in enumerate(entry_names): + current_step += 1 + await report_progress(ctx, current_step, total_steps, f"Searching for {name}") + funcs = functions_list(port=port, grep=f"^{name}$", page_size=5) + if isinstance(funcs, dict): + main_funcs.extend(funcs.get("functions", funcs.get("items", []))) + + # Search for suspicious strings + suspicious_patterns = [ + "cmd.exe", "powershell", "/bin/sh", "CreateRemoteThread", + "VirtualAlloc", "WriteProcessMemory", "http://", "https://", + ] + + suspicious_strings = [] + for i, pattern in enumerate(suspicious_patterns): + current_step += 1 + await report_progress(ctx, current_step, total_steps, f"Scanning strings for '{pattern}'") + strings_result = data_list_strings(port=port, grep=pattern, grep_ignorecase=True, page_size=5) + if isinstance(strings_result, dict): + strs = strings_result.get("strings", strings_result.get("items", [])) + for s in strs: + s['_pattern'] = pattern + suspicious_strings.extend(strs) + + # Search for suspicious imports/functions + suspicious_funcs = [] + func_patterns = ["Virtual", "CreateThread", "LoadLibrary", "GetProcAddress", "Shell", "Inject"] + for i, pattern in enumerate(func_patterns): + current_step += 1 + await report_progress(ctx, current_step, total_steps, f"Scanning functions for '{pattern}'") + funcs = functions_list(port=port, grep=pattern, grep_ignorecase=True, page_size=5) + if isinstance(funcs, dict): + suspicious_funcs.extend(funcs.get("functions", funcs.get("items", []))) + + # Format outputs + main_display = "\n".join([ + f" {f.get('address', f.get('entry_point', 'N/A'))}: {f.get('name', 'unknown')}" + for f in main_funcs + ]) if main_funcs else "No standard entry points found" + + strings_display = "\n".join([ + f" [{s.get('_pattern', '?')}] {s.get('address', 'N/A')}: {str(s.get('value', s.get('string', s)))[:50]}" + for s in suspicious_strings[:20] + ]) if suspicious_strings else "No suspicious strings found" + + funcs_display = "\n".join([ + f" {f.get('address', f.get('entry_point', 'N/A'))}: {f.get('name', 'unknown')}" + for f in suspicious_funcs[:20] + ]) if suspicious_funcs else "No suspicious functions found" + + return { + "prompt": f""" + # Malware Triage Analysis + + **Binary**: {program_info.get('program_name', 'unknown')} + **Format**: {program_info.get('format', 'unknown')} + **Architecture**: {program_info.get('processor', 'unknown')} + + ## Entry Points: + ``` +{main_display} + ``` + + ## Suspicious Strings: + ``` +{strings_display} + ``` + + ## Suspicious Functions: + ``` +{funcs_display} + ``` + + ## Triage Checklist: + + ### 1. Static Indicators + - [ ] Check for packed/obfuscated sections + - [ ] Identify compiler and build artifacts + - [ ] Look for anti-analysis techniques + - [ ] Check import table for suspicious APIs + - [ ] Examine strings for IOCs (IPs, domains, paths) + + ### 2. Capability Assessment + + **Persistence Mechanisms:** + - Registry modifications (RegSetValue, RegCreateKey) + - Service creation (CreateService, StartService) + - Scheduled tasks + - Startup folder modifications + + **Network Capabilities:** + - C2 communication patterns + - Data exfiltration methods + - Download/upload functionality + - Protocol usage (HTTP, DNS, custom) + + **Process Manipulation:** + - Process injection (WriteProcessMemory, CreateRemoteThread) + - Process hollowing + - DLL injection + - Thread hijacking + + **Evasion Techniques:** + - Anti-debugging (IsDebuggerPresent, CheckRemoteDebugger) + - Anti-VM detection + - Timing checks + - Environment checks + + **Payload Delivery:** + - Shellcode execution + - Reflective loading + - File dropping + - Memory-only execution + + ### 3. Priority Functions to Analyze + 1. Entry point / main function + 2. Functions with network-related names + 3. Functions calling VirtualAlloc + Write + Execute + 4. Functions with obfuscated names or unusual patterns + 5. Error handlers and cleanup routines + + ### 4. IOC Extraction + - Extract all URLs, IPs, and domains + - Note file paths and registry keys + - Document mutex names + - Record any hardcoded credentials + + ## Recommended Follow-up: + - `functions_decompile(name="")` - Analyze main logic + - `xrefs_list(address="")` - Find usage patterns + - `data_list_strings(grep="")` - Search for more IOCs + - `analysis_get_callgraph(address="...")` - Map execution flow + """, + "context": { + "program_info": program_info, + "entry_points": main_funcs, + "suspicious_strings_count": len(suspicious_strings), + "suspicious_funcs_count": len(suspicious_funcs) + } + } + +@mcp.prompt("analyze_protocol") +def analyze_protocol_prompt(name: str = None, address: str = None, port: int = None): + """A prompt to analyze network or file protocol handling + + Helps reverse engineer protocol parsers and handlers. + + Args: + name: Function name to analyze (optional) + address: Function address to analyze (optional) + port: Specific Ghidra instance port (optional) + """ + port = _get_instance_port(port) + + # If specific function provided, get its details + target_decompiled = "" + target_info = None + + if address: + target_decompiled = decompiled_function_by_address(address=address, port=port) + target_info = function_info_by_address(address=address, port=port) + elif name: + target_decompiled = decompiled_function_by_name(name=name, port=port) + target_info = function_info_by_name(name=name, port=port) + + # Search for protocol-related functions + protocol_patterns = ["parse", "read", "recv", "process", "handle", "decode", "packet", "message", "frame", "header"] + + protocol_funcs = [] + for pattern in protocol_patterns[:5]: + funcs = functions_list(port=port, grep=pattern, grep_ignorecase=True, page_size=10) + if isinstance(funcs, dict): + protocol_funcs.extend(funcs.get("functions", funcs.get("items", []))) + + # Deduplicate + seen = set() + unique_funcs = [] + for f in protocol_funcs: + addr = f.get("address", f.get("entry_point", str(f))) + if addr not in seen: + seen.add(addr) + unique_funcs.append(f) + + funcs_display = "\n".join([ + f" {f.get('address', f.get('entry_point', 'N/A'))}: {f.get('name', 'unknown')}" + for f in unique_funcs[:25] + ]) if unique_funcs else "No obvious protocol functions found" + + target_section = "" + if target_decompiled: + func_name = name or address + if isinstance(target_info, dict): + func_name = target_info.get("name", func_name) + target_section = f""" + ## Target Function: {func_name} + ```c +{target_decompiled} + ``` + """ + + return { + "prompt": f""" + # Protocol Analysis + {target_section} + ## Potentially Protocol-Related Functions: + ``` +{funcs_display} + ``` + + ## Analysis Framework: + + ### 1. Message Structure + - **Header Analysis**: Identify fixed-size headers + - Magic bytes / signature + - Version field + - Message type / opcode + - Length field(s) + - Flags / options + - Checksum / CRC + + - **Payload Analysis**: Variable-length data + - Field delimiters + - Length-prefixed fields + - Nested structures + - Padding / alignment + + ### 2. State Machine + - Identify protocol states (init, handshake, established, etc.) + - Map state transitions + - Find state storage variables + - Identify timeout handling + + ### 3. Message Types + For each message type, document: + - Opcode / type identifier + - Required fields + - Optional fields + - Expected responses + - Error conditions + + ### 4. Parsing Logic + - Buffer handling (how is input buffered?) + - Boundary checking (are lengths validated?) + - Error handling (what happens on malformed input?) + - Memory management (allocations, frees) + + ### 5. Security Analysis + - Integer overflows in length calculations + - Buffer overflows from unchecked lengths + - Format string issues + - Injection vulnerabilities + - Authentication/authorization checks + - Encryption/signing of messages + + ### 6. Documentation Format + Create protocol documentation: + ``` + +--------+--------+--------+--------+ + | Magic | Ver | Type | Length | + +--------+--------+--------+--------+ + | Payload... | + +--------+--------+--------+--------+ + ``` + + ## Recommended Analysis Flow: + 1. Find the main receive/read loop + 2. Identify the dispatch table or switch statement + 3. Analyze each message handler + 4. Document the message format + 5. Look for authentication handshakes + 6. Check for encryption setup + + ## Recommended Tools: + - `functions_decompile(name="...")` - Analyze handler functions + - `structs_list()` / `structs_get()` - Find message structures + - `data_list_strings(grep="error")` - Find error messages + - `analysis_get_callgraph(address="...")` - Map handler relationships + """, + "context": { + "target_function": target_info, + "protocol_functions": unique_funcs[:15] + } + } + +@mcp.prompt("find_main_logic") +async def find_main_logic_prompt(port: int = None, ctx: Context = None): + """A prompt to find the main application logic past runtime initialization + + Helps navigate past CRT startup, library initialization, and boilerplate + to find where the actual program logic begins. Reports progress during scanning. + + Args: + port: Specific Ghidra instance port (optional) + ctx: FastMCP context for progress reporting (auto-injected) + """ + port = _get_instance_port(port) + + # Get program info (step 1 of ~22: 17 entry names + 3 init + 1 info + 1 decompile) + await report_step(ctx, 1, 22, "Getting program info") + program_info = ghidra_instance(port=port) + + # Find entry points and potential main functions + entry_candidates = [] + + # Standard entry point names across platforms + entry_names = [ + # Unix/Linux + "main", "_main", "__main", "start", "_start", "__libc_start_main", + # Windows + "WinMain", "wWinMain", "wmain", "_wmain", "WinMainCRTStartup", + "mainCRTStartup", "wmainCRTStartup", "wWinMainCRTStartup", + # Windows DLL + "DllMain", "DllMainCRTStartup", "_DllMainCRTStartup@12", + # macOS + "_main", "start" + ] + + for idx, name in enumerate(entry_names, start=2): + await report_step(ctx, idx, 22, f"Searching for {name}") + funcs = functions_list(port=port, grep=f"^{name}$", page_size=5) + if isinstance(funcs, dict): + for f in funcs.get("functions", funcs.get("items", [])): + f['_match_type'] = 'exact_name' + entry_candidates.append(f) + + # Also search for functions with "init" or "setup" that might be called early + init_patterns = ["init", "setup", "initialize"] + init_funcs = [] + base_step = 2 + len(entry_names) # 19 + for idx, pattern in enumerate(init_patterns, start=base_step): + await report_step(ctx, idx, 22, f"Scanning for {pattern} functions") + funcs = functions_list(port=port, grep=pattern, grep_ignorecase=True, page_size=10) + if isinstance(funcs, dict): + init_funcs.extend(funcs.get("functions", funcs.get("items", []))[:5]) + + # Get decompilation of main entry point if found + main_decompiled = "" + main_entry = None + for candidate in entry_candidates: + name = candidate.get("name", "") + if name.lower() in ["main", "_main", "winmain", "wwinmain"]: + main_entry = candidate + addr = candidate.get("address", candidate.get("entry_point")) + if addr: + main_decompiled = decompiled_function_by_address(address=addr, port=port) + break + + # Format entry points + entries_display = "\n".join([ + f" {f.get('address', f.get('entry_point', 'N/A'))}: {f.get('name', 'unknown')}" + for f in entry_candidates[:15] + ]) if entry_candidates else "No standard entry points found" + + init_display = "\n".join([ + f" {f.get('address', f.get('entry_point', 'N/A'))}: {f.get('name', 'unknown')}" + for f in init_funcs[:10] + ]) if init_funcs else "No initialization functions found" + + main_section = "" + if main_decompiled and main_entry: + main_section = f""" +## Main Function: {main_entry.get('name', 'unknown')} +```c +{main_decompiled} +``` +""" + + return { + "prompt": f""" +# Finding the Main Application Logic + +**Binary**: {program_info.get('program_name', 'unknown')} +**Architecture**: {program_info.get('language', 'unknown')} + +## Entry Point Candidates +``` +{entries_display} +``` + +## Initialization Functions +``` +{init_display} +``` +{main_section} +## Analysis Strategy + +### Phase 1: Identify True Entry Point +The program's execution flow typically follows this pattern: + +``` +OS Loader + └─→ _start / Entry Point (CRT startup) + └─→ __libc_start_main (glibc) / mainCRTStartup (MSVC) + └─→ Global constructors (__init_array, .ctors) + └─→ main() / WinMain() ← ACTUAL LOGIC STARTS HERE +``` + +**For ELF binaries:** +1. `_start` calls `__libc_start_main(main, argc, argv, ...)` +2. Look for the first argument passed to `__libc_start_main` - that's `main` +3. Or find function called after `__libc_csu_init` + +**For PE binaries:** +1. Entry point is usually `mainCRTStartup` or `WinMainCRTStartup` +2. Look for call to `main`/`WinMain` after `__security_init_cookie` +3. Check for `_initterm` calls (global constructor invocation) + +### Phase 2: Navigate Past Boilerplate + +**Skip these patterns:** +- Security cookie initialization (`__security_init_cookie`) +- Heap initialization (`_heap_init`, `HeapCreate`) +- Locale/encoding setup (`setlocale`, `_setmbcp`) +- Exception handler registration (`__try`/`__except` setup) +- TLS callbacks (check `.tls` section) +- ATL/MFC initialization (look for `AfxWinMain`) + +**Find the real logic by looking for:** +- Command-line argument processing (`argc`, `argv`, `GetCommandLine`) +- Configuration file loading +- Main event loop or service dispatcher +- First significant branching based on user input + +### Phase 3: Map the Core Logic + +Once you find `main` or equivalent: +1. **Identify the primary dispatch pattern:** + - Is it a CLI tool (argument parsing → action)? + - Is it a service (initialization → main loop)? + - Is it a GUI app (window creation → message pump)? + +2. **Find the "inner main":** + - Many programs have a wrapper main that just calls the real logic + - Look for the function that receives parsed arguments + - Often named like `real_main`, `app_main`, `do_work`, etc. + +3. **Document the high-level flow:** + ``` + main() + ├── parse_arguments() + ├── initialize_subsystems() + ├── load_configuration() + └── run_main_loop() ← Primary logic here + ``` + +### Red Flags (Not Main Logic) +- Functions with `crt`, `init`, `startup` in name +- Functions that only call other init functions +- Functions setting up global state without processing input +- Exception handler registration functions + +## Recommended Next Steps +1. `functions_decompile(name="main")` - Analyze main if found +2. `analysis_get_callgraph(name="main")` - See what main calls +3. `xrefs_list(address="")` - Verify main is called from CRT +4. Look for the first function that processes `argc`/`argv` or user input +""", + "context": { + "program_info": program_info, + "entry_candidates": entry_candidates[:10], + "init_functions": init_funcs[:10], + "main_function": main_entry + } + } + +@mcp.prompt("analyze_imports") +async def analyze_imports_prompt(port: int = None, ctx: Context = None): + """A prompt to analyze the import table and understand binary capabilities + + Categorizes imports by functionality to quickly assess what a binary can do. + Reports progress during multi-category scanning. + + Args: + port: Specific Ghidra instance port (optional) + ctx: FastMCP context for progress reporting (auto-injected) + """ + port = _get_instance_port(port) + + # Get program info + await report_step(ctx, 1, 12, "Getting program info") + program_info = ghidra_instance(port=port) + + # Define capability categories and their indicator functions + categories = { + "File Operations": ["CreateFile", "ReadFile", "WriteFile", "DeleteFile", "fopen", "fread", "fwrite", "open", "read", "write", "unlink", "remove"], + "Network": ["socket", "connect", "send", "recv", "WSAStartup", "getaddrinfo", "inet_", "http", "InternetOpen", "WinHttpOpen", "URLDownload"], + "Process/Thread": ["CreateProcess", "CreateThread", "CreateRemoteThread", "OpenProcess", "TerminateProcess", "fork", "exec", "pthread"], + "Memory": ["VirtualAlloc", "VirtualProtect", "WriteProcessMemory", "ReadProcessMemory", "mmap", "mprotect", "malloc", "HeapAlloc"], + "Registry (Windows)": ["RegOpenKey", "RegSetValue", "RegQueryValue", "RegCreateKey", "RegDeleteKey"], + "Crypto": ["Crypt", "BCrypt", "NCrypt", "AES", "RSA", "SHA", "MD5", "SSL", "TLS", "EVP_"], + "DLL/Library": ["LoadLibrary", "GetProcAddress", "dlopen", "dlsym", "FreeLibrary"], + "User Interface": ["MessageBox", "CreateWindow", "GetDlgItem", "DialogBox", "gtk_", "Qt"], + "Service": ["CreateService", "StartService", "OpenSCManager", "ControlService"], + "Debugging/Evasion": ["IsDebuggerPresent", "CheckRemoteDebugger", "NtQueryInformationProcess", "OutputDebugString", "ptrace"], + } + + # Search for functions matching each category (10 categories + 1 info + 1 analysis = 12 steps) + capability_results = {} + all_found = [] + + for idx, (category, patterns) in enumerate(categories.items(), start=2): + await report_step(ctx, idx, 12, f"Scanning {category}") + found = [] + for pattern in patterns[:5]: # Limit queries per category + funcs = functions_list(port=port, grep=pattern, grep_ignorecase=True, page_size=10) + if isinstance(funcs, dict): + for f in funcs.get("functions", funcs.get("items", [])): + f['_category'] = category + f['_pattern'] = pattern + found.append(f) + all_found.append(f) + + # Deduplicate within category + seen = set() + unique = [] + for f in found: + addr = f.get("address", f.get("entry_point", str(f))) + if addr not in seen: + seen.add(addr) + unique.append(f) + + if unique: + capability_results[category] = unique + + # Format capability summary + capability_summary = [] + for category, funcs in capability_results.items(): + func_names = [f.get('name', 'unknown') for f in funcs[:5]] + extras = f" (+{len(funcs)-5} more)" if len(funcs) > 5 else "" + capability_summary.append(f"**{category}** ({len(funcs)} functions)") + capability_summary.append(f" └─ {', '.join(func_names)}{extras}") + + summary_display = "\n".join(capability_summary) if capability_summary else "No notable imports detected" + + # Identify suspicious combinations + suspicious_combos = [] + cats = set(capability_results.keys()) + + if "Memory" in cats and "Process/Thread" in cats: + if any("WriteProcessMemory" in f.get('name', '') for f in capability_results.get("Memory", [])): + suspicious_combos.append("⚠️ **Process Injection Pattern**: Memory + Process manipulation detected") + + if "Network" in cats and "Crypto" in cats: + suspicious_combos.append("🔐 **Encrypted Communication**: Network + Crypto APIs present") + + if "DLL/Library" in cats and "Memory" in cats: + suspicious_combos.append("⚠️ **Dynamic Loading Pattern**: LoadLibrary + Memory manipulation") + + if "Debugging/Evasion" in cats: + suspicious_combos.append("🛡️ **Anti-Analysis**: Debugger detection APIs present") + + if "Service" in cats and "Registry (Windows)" in cats: + suspicious_combos.append("📌 **Persistence Pattern**: Service + Registry access") + + combos_display = "\n".join(suspicious_combos) if suspicious_combos else "No suspicious combinations detected" + + return { + "prompt": f""" +# Import Analysis Report + +**Binary**: {program_info.get('program_name', 'unknown')} +**Architecture**: {program_info.get('language', 'unknown')} + +## Capability Summary + +{summary_display} + +## Suspicious Combinations +{combos_display} + +## Detailed Analysis Framework + +### 1. Capability Risk Assessment + +| Capability | Risk Level | Investigation Priority | +|------------|------------|----------------------| +| Process Injection (WriteProcessMemory + CreateRemoteThread) | 🔴 Critical | Immediate | +| Code Download & Execute (URLDownload + ShellExecute) | 🔴 Critical | Immediate | +| Anti-Debugging | 🟡 Medium | High | +| Registry Persistence | 🟡 Medium | High | +| Encrypted Network I/O | 🟡 Medium | Medium | +| Standard File I/O | 🟢 Low | Low | + +### 2. Import Pattern Analysis + +**Injection Indicators:** +- `VirtualAllocEx` + `WriteProcessMemory` + `CreateRemoteThread` = Classic injection +- `NtCreateThreadEx` + `NtMapViewOfSection` = Stealthier injection +- `SetWindowsHookEx` = DLL injection via hooks +- `QueueUserAPC` = APC injection + +**Evasion Indicators:** +- `IsDebuggerPresent`, `CheckRemoteDebuggerPresent` = Basic anti-debug +- `NtQueryInformationProcess` (ProcessDebugPort) = Advanced anti-debug +- `GetTickCount` comparisons = Timing-based detection +- `rdtsc` instruction usage = VM/sandbox detection + +**Persistence Indicators:** +- `RegSetValueEx` with Run/RunOnce keys +- `CreateService` / `ChangeServiceConfig` +- `SchRpcRegisterTask` = Scheduled tasks +- `CopyFile` to startup locations + +**Data Exfiltration Indicators:** +- `InternetOpen` + `InternetConnect` + `HttpSendRequest` +- `socket` + `connect` to non-standard ports +- `CryptEncrypt` before network send +- `compress` / `zip` functions before send + +### 3. Library-Specific Patterns + +**OpenSSL Indicators:** +- `SSL_CTX_new`, `SSL_connect`, `SSL_read`, `SSL_write` +- Likely secure communications + +**Windows Crypto API:** +- `CryptAcquireContext`, `CryptCreateHash`, `CryptEncrypt` +- Check for hardcoded keys or weak algorithms + +**Compression Libraries:** +- `deflate`, `inflate` (zlib) +- `LZ4_compress`, `LZ4_decompress` +- Often used before exfiltration + +### 4. Cross-Reference Strategy + +For each suspicious import: +1. Find all call sites: `xrefs_list(name="")` +2. Analyze calling functions: Look for the orchestrating function +3. Check data flow: What data reaches these calls? + +### 5. Priority Functions to Analyze + +Based on the imports found, prioritize: +1. Functions that call multiple suspicious APIs +2. Functions that set up network connections +3. Functions that manipulate other processes +4. Functions referenced from entry points + +## Recommended Next Steps +- `xrefs_list(name="")` - Find usage locations +- `functions_decompile(address="")` - Analyze calling code +- `analysis_get_callgraph(name="")` - Map the attack flow +- `data_list_strings(grep="http|ftp|\\\\\\\\")` - Find network destinations +""", + "context": { + "program_info": program_info, + "capabilities": {k: [f.get('name') for f in v[:10]] for k, v in capability_results.items()}, + "total_imports_analyzed": len(all_found), + "suspicious_patterns": suspicious_combos + } + } + +@mcp.prompt("find_authentication") +async def find_authentication_prompt(port: int = None, ctx: Context = None): + """A prompt to locate authentication, authorization, and credential handling code + + Helps find password validation, license checks, session management, and access control. + Reports progress during multi-pattern scanning. + + Args: + port: Specific Ghidra instance port (optional) + ctx: FastMCP context for progress reporting (auto-injected) + """ + port = _get_instance_port(port) + + # Get program info + await report_step(ctx, 1, 30, "Getting program info") + program_info = ghidra_instance(port=port) + + # Search for authentication-related function names (17 patterns) + auth_patterns = [ + "auth", "login", "logon", "password", "passwd", "credential", + "verify", "validate", "check", "license", "serial", "key", + "token", "session", "permission", "access", "privilege" + ] + + auth_funcs = [] + for idx, pattern in enumerate(auth_patterns, start=2): + await report_step(ctx, idx, 30, f"Scanning functions for '{pattern}'") + funcs = functions_list(port=port, grep=pattern, grep_ignorecase=True, page_size=10) + if isinstance(funcs, dict): + for f in funcs.get("functions", funcs.get("items", [])): + f['_pattern'] = pattern + auth_funcs.append(f) + + # Deduplicate + seen = set() + unique_auth = [] + for f in auth_funcs: + addr = f.get("address", f.get("entry_point", str(f))) + if addr not in seen: + seen.add(addr) + unique_auth.append(f) + + # Search for authentication-related strings (6 patterns) + auth_string_patterns = [ + "password", "invalid", "incorrect", "denied", "authorized", + "authentication", "license", "expired", "trial", "registered" + ] + + auth_strings = [] + base_step = 2 + len(auth_patterns) # 19 + for idx, pattern in enumerate(auth_string_patterns[:6], start=base_step): + await report_step(ctx, idx, 30, f"Scanning strings for '{pattern}'") + strings = data_list_strings(port=port, grep=pattern, grep_ignorecase=True, page_size=8) + if isinstance(strings, dict): + for s in strings.get("strings", strings.get("items", [])): + s['_pattern'] = pattern + auth_strings.append(s) + + # Search for crypto functions often used in auth (6 patterns) + crypto_patterns = ["hash", "sha", "md5", "bcrypt", "hmac", "pbkdf"] + crypto_auth = [] + base_step = 19 + 6 # 25 + for idx, pattern in enumerate(crypto_patterns, start=base_step): + await report_step(ctx, idx, 30, f"Scanning crypto '{pattern}'") + funcs = functions_list(port=port, grep=pattern, grep_ignorecase=True, page_size=5) + if isinstance(funcs, dict): + crypto_auth.extend(funcs.get("functions", funcs.get("items", []))[:3]) + + # Format outputs + funcs_display = "\n".join([ + f" {f.get('address', f.get('entry_point', 'N/A'))}: {f.get('name', 'unknown')} [{f.get('_pattern', '')}]" + for f in unique_auth[:25] + ]) if unique_auth else "No obvious authentication functions found" + + strings_display = "\n".join([ + f" {s.get('address', 'N/A')}: \"{str(s.get('value', s.get('string', s)))[:50]}\" [{s.get('_pattern', '')}]" + for s in auth_strings[:20] + ]) if auth_strings else "No authentication-related strings found" + + crypto_display = "\n".join([ + f" {f.get('address', f.get('entry_point', 'N/A'))}: {f.get('name', 'unknown')}" + for f in crypto_auth[:10] + ]) if crypto_auth else "No crypto functions found" + + return { + "prompt": f""" +# Authentication & Authorization Analysis + +**Binary**: {program_info.get('program_name', 'unknown')} + +## Potential Authentication Functions +``` +{funcs_display} +``` + +## Authentication-Related Strings +``` +{strings_display} +``` + +## Cryptographic Functions (Often Used in Auth) +``` +{crypto_display} +``` + +## Analysis Framework + +### 1. Authentication Pattern Recognition + +**Password Validation Patterns:** +```c +// Pattern 1: Direct comparison (WEAK) +if (strcmp(input_password, "hardcoded") == 0) + +// Pattern 2: Hash comparison (Better) +hash = compute_hash(input_password); +if (memcmp(hash, stored_hash, 32) == 0) + +// Pattern 3: API-based (Best) +result = CheckCredentials(username, password); +``` + +**License Key Validation Patterns:** +```c +// Pattern 1: Checksum validation +if (compute_checksum(key) == expected) + +// Pattern 2: Algorithmic (XOR, math operations) +decoded = key ^ magic_constant; +if (decoded % prime == 0) + +// Pattern 3: Online validation +result = validate_with_server(key); +``` + +### 2. Common Vulnerability Points + +| Vulnerability | What to Look For | +|--------------|-----------------| +| Hardcoded credentials | String comparisons with constants | +| Weak hashing | MD5/SHA1 without salt | +| Bypassable checks | Single comparison that can be NOPed | +| Logic flaws | Inverted conditions, early returns | +| Timing attacks | Non-constant-time comparisons | +| Default credentials | Strings like "admin", "password", "default" | + +### 3. Finding the Auth Decision Point + +The critical point is usually: +``` + ┌─────────────┐ + │ Auth Check │ + └──────┬──────┘ + │ + ┌───────┴───────┐ + ▼ ▼ + [SUCCESS] [FAILURE] + Grant Access Deny/Error +``` + +**To find it:** +1. Locate error strings ("Invalid password", "Access denied") +2. Find xrefs to those strings +3. Look for the conditional branch before the error +4. The other branch leads to success path + +### 4. Session Management Analysis + +Look for: +- Token generation after successful auth +- Session ID storage (cookies, memory, files) +- Session timeout handling +- Session invalidation on logout + +**Session Token Red Flags:** +- Predictable generation (sequential, time-based) +- Insufficient entropy +- No expiration +- Stored in plaintext + +### 5. Privilege Escalation Points + +Check for: +- Role/permission checks: `if (user.role == ADMIN)` +- Capability flags: `if (flags & CAN_WRITE)` +- Group membership: `IsUserInGroup()` +- File/resource ACLs + +### 6. Bypass Strategies (For Security Research) + +**Binary Patching Targets:** +- JZ → JNZ (invert condition) +- CALL auth_check → NOP +- Return value modification + +**Runtime Bypass:** +- Hook authentication function +- Modify comparison result +- Inject valid session + +### 7. Recommended Analysis Flow + +``` +Step 1: Find auth strings + └─→ "Invalid password", "Access denied", etc. + +Step 2: Trace to calling function + └─→ xrefs_list(address="") + +Step 3: Analyze the decision logic + └─→ functions_decompile(address="") + +Step 4: Find the success path + └─→ What happens when auth succeeds? + +Step 5: Map the complete auth flow + └─→ analysis_get_callgraph(address="") +``` + +## Recommended Next Steps +- `xrefs_list(address="")` - Find code using auth messages +- `functions_decompile(name="")` - Analyze authentication logic +- `data_list_strings(grep="admin|root|password")` - Find potential credentials +- `analysis_get_callgraph(name="")` - Map auth code flow +""", + "context": { + "program_info": program_info, + "auth_functions": [f.get('name') for f in unique_auth[:15]], + "auth_strings": [s.get('value', s.get('string', ''))[:40] for s in auth_strings[:10]], + "crypto_functions": [f.get('name') for f in crypto_auth[:10]] + } + } + +@mcp.prompt("analyze_switch_table") +def analyze_switch_table_prompt(name: str = None, address: str = None, port: int = None): + """A prompt to analyze switch/dispatch tables for command processing + + Helps reverse engineer command handlers, protocol dispatchers, and menu systems. + + Args: + name: Function name containing switch (optional) + address: Function address containing switch (optional) + port: Specific Ghidra instance port (optional) + """ + port = _get_instance_port(port) + + # Get program info + program_info = ghidra_instance(port=port) + + # If specific function provided, get its details + target_decompiled = "" + target_disasm = "" + target_info = None + + if address: + target_decompiled = decompiled_function_by_address(address=address, port=port) + target_disasm = disassembly_by_address(address=address, port=port) + target_info = function_info_by_address(address=address, port=port) + elif name: + target_decompiled = decompiled_function_by_name(name=name, port=port) + target_disasm = disassembly_by_name(name=name, port=port) + target_info = function_info_by_name(name=name, port=port) + + # Search for potential dispatch functions + dispatch_patterns = [ + "dispatch", "handler", "process", "handle", "command", "cmd", + "opcode", "switch", "route", "execute", "action" + ] + + dispatch_funcs = [] + for pattern in dispatch_patterns[:6]: + funcs = functions_list(port=port, grep=pattern, grep_ignorecase=True, page_size=8) + if isinstance(funcs, dict): + dispatch_funcs.extend(funcs.get("functions", funcs.get("items", []))[:4]) + + # Deduplicate + seen = set() + unique_dispatch = [] + for f in dispatch_funcs: + addr = f.get("address", f.get("entry_point", str(f))) + if addr not in seen: + seen.add(addr) + unique_dispatch.append(f) + + # Format outputs + target_section = "" + if target_decompiled: + func_name = name or address + if isinstance(target_info, dict): + func_name = target_info.get("name", func_name) + target_section = f""" +## Target Function: {func_name} + +### Decompiled Code: +```c +{target_decompiled} +``` + +### Disassembly (for jump table analysis): +``` +{target_disasm[:3000] if target_disasm else "Not available"} +``` +""" + + dispatch_display = "\n".join([ + f" {f.get('address', f.get('entry_point', 'N/A'))}: {f.get('name', 'unknown')}" + for f in unique_dispatch[:15] + ]) if unique_dispatch else "No obvious dispatch functions found" + + return { + "prompt": f""" +# Switch/Dispatch Table Analysis + +**Binary**: {program_info.get('program_name', 'unknown')} +{target_section} +## Potential Dispatch Functions +``` +{dispatch_display} +``` + +## Analysis Framework + +### 1. Identifying Switch Patterns + +**Compiler-Generated Patterns:** + +```c +// Direct switch (small, sparse values) +switch(cmd) {{ + case 1: handle_read(); break; + case 2: handle_write(); break; + case 5: handle_delete(); break; +}} +``` +Assembly: Series of CMP + JE instructions + +```c +// Jump table (dense sequential values) +switch(cmd) {{ + case 0: case 1: case 2: case 3: ... +}} +``` +Assembly: Bounds check + indirect jump via table + +```c +// Binary search (many sparse values) +switch(cmd) {{ + case 100: case 200: case 500: case 1000: ... +}} +``` +Assembly: Nested CMP comparisons + +### 2. Jump Table Recognition + +**x86/x64 Pattern:** +```asm +cmp eax, MAX_CASE ; Bounds check +ja default_case ; Out of range +mov eax, [jump_table + rax*4] ; Load handler +jmp rax ; Indirect jump +``` + +**Ghidra Indicators:** +- Look for `switchD_` labels in disassembly +- Check for computed jumps (`jmp [reg + offset]`) +- Find tables of addresses in `.rodata` or `.rdata` + +### 3. Extracting Case Handlers + +For each case value, document: + +| Case | Value | Handler Address | Purpose | +|------|-------|-----------------|---------| +| 0 | 0x00 | 0x401000 | Initialize | +| 1 | 0x01 | 0x401050 | Read data | +| 2 | 0x02 | 0x4010A0 | Write data | +| ... | ... | ... | ... | + +### 4. Command Protocol Analysis + +**Common Dispatch Architectures:** + +``` +Type 1: Flat Dispatch +┌──────────────┐ +│ Read Command │ +└──────┬───────┘ + ▼ +┌──────────────┐ +│ switch(cmd) │──→ handler_1() +│ │──→ handler_2() +│ │──→ handler_3() +└──────────────┘ +``` + +``` +Type 2: Nested Dispatch +┌──────────────┐ +│ Read Group │ +└──────┬───────┘ + ▼ +┌──────────────┐ ┌─────────────┐ +│switch(group) │──→ │switch(subcmd)│ +└──────────────┘ └─────────────┘ +``` + +``` +Type 3: Function Pointer Table +┌──────────────────────────────────┐ +│ handlers[] = {{h1, h2, h3, ...}} │ +│ handlers[cmd]() │ +└──────────────────────────────────┘ +``` + +### 5. Reverse Engineering Strategy + +**Step 1: Find the dispatch point** +- Look for the main switch or function pointer call +- Identify the command/opcode variable + +**Step 2: Map all cases** +- Extract all case values +- Find corresponding handler addresses +- Note default/error handling + +**Step 3: Analyze each handler** +- What parameters does it receive? +- What actions does it perform? +- What does it return? + +**Step 4: Document the protocol** +``` +Command Format: +┌────────┬────────┬──────────┐ +│ OpCode │ Length │ Payload │ +│ 1 byte │ 2 bytes│ N bytes │ +└────────┴────────┴──────────┘ + +OpCode 0x01: READ + Payload: [offset:4][length:4] + Response: [data:length] + +OpCode 0x02: WRITE + Payload: [offset:4][length:4][data:length] + Response: [status:1] +``` + +### 6. Finding Hidden Commands + +**Look for:** +- Cases with no obvious string references (debug commands) +- Cases that check additional conditions (privileged commands) +- Default case that does something other than error +- Gaps in sequential case numbers + +### 7. Common Pitfalls + +- **Virtual dispatch**: C++ vtables look like switch tables +- **String switches**: May use hash-based dispatch +- **Multi-level switches**: Nested command/subcommand structure +- **Indirect handlers**: Function pointers read from data structures + +## Recommended Next Steps +- `functions_decompile(address="")` - Analyze individual handlers +- `xrefs_list(name="")` - Find what calls the dispatcher +- `data_list(grep="")` - Find jump tables in data +- `analysis_get_callgraph(address="")` - Map handler relationships +""", + "context": { + "program_info": program_info, + "target_function": target_info, + "dispatch_functions": [f.get('name') for f in unique_dispatch[:15]] + } + } + +@mcp.prompt("find_config_parsing") +async def find_config_parsing_prompt(port: int = None, ctx: Context = None): + """A prompt to identify configuration file parsing and settings management + + Helps find how a program reads, parses, and stores its configuration. + Reports progress during multi-category scanning. + + Args: + port: Specific Ghidra instance port (optional) + ctx: FastMCP context for progress reporting (auto-injected) + """ + port = _get_instance_port(port) + + # Get program info (total: 8 config + 6 strings + 4 registry + 4 env + 1 info = 23) + await report_step(ctx, 1, 23, "Getting program info") + program_info = ghidra_instance(port=port) + + # Search for config-related functions (8 patterns) + config_patterns = [ + "config", "setting", "option", "preference", "pref", + "ini", "json", "xml", "yaml", "toml", "parse", + "load", "save", "read", "write" + ] + + config_funcs = [] + for idx, pattern in enumerate(config_patterns[:8], start=2): + await report_step(ctx, idx, 23, f"Scanning config functions: '{pattern}'") + funcs = functions_list(port=port, grep=pattern, grep_ignorecase=True, page_size=8) + if isinstance(funcs, dict): + for f in funcs.get("functions", funcs.get("items", []))[:4]: + f['_pattern'] = pattern + config_funcs.append(f) + + # Deduplicate + seen = set() + unique_config = [] + for f in config_funcs: + addr = f.get("address", f.get("entry_point", str(f))) + if addr not in seen: + seen.add(addr) + unique_config.append(f) + + # Search for config-related strings (file paths, keys, defaults) (6 patterns) + config_strings = [] + string_patterns = [ + "\\.ini", "\\.json", "\\.xml", "\\.cfg", "\\.conf", + "config", "setting", "/etc/", "AppData", "HKEY_" + ] + + base_step = 2 + 8 # 10 + for idx, pattern in enumerate(string_patterns[:6], start=base_step): + await report_step(ctx, idx, 23, f"Scanning config strings: '{pattern}'") + strings = data_list_strings(port=port, grep=pattern, grep_ignorecase=True, page_size=8) + if isinstance(strings, dict): + config_strings.extend(strings.get("strings", strings.get("items", []))[:4]) + + # Search for registry functions (Windows) (4 patterns) + registry_patterns = ["RegOpen", "RegQuery", "RegSet", "RegGet"] + registry_funcs = [] + base_step = 10 + 6 # 16 + for idx, pattern in enumerate(registry_patterns, start=base_step): + await report_step(ctx, idx, 23, f"Scanning registry: '{pattern}'") + funcs = functions_list(port=port, grep=pattern, page_size=5) + if isinstance(funcs, dict): + registry_funcs.extend(funcs.get("functions", funcs.get("items", []))[:3]) + + # Search for environment variable functions (4 patterns) + env_patterns = ["getenv", "GetEnvironmentVariable", "setenv", "putenv"] + env_funcs = [] + base_step = 16 + 4 # 20 + for idx, pattern in enumerate(env_patterns, start=base_step): + await report_step(ctx, idx, 23, f"Scanning environment: '{pattern}'") + funcs = functions_list(port=port, grep=pattern, page_size=3) + if isinstance(funcs, dict): + env_funcs.extend(funcs.get("functions", funcs.get("items", []))[:2]) + + # Format outputs + config_display = "\n".join([ + f" {f.get('address', f.get('entry_point', 'N/A'))}: {f.get('name', 'unknown')} [{f.get('_pattern', '')}]" + for f in unique_config[:20] + ]) if unique_config else "No config-related functions found" + + strings_display = "\n".join([ + f" {s.get('address', 'N/A')}: \"{str(s.get('value', s.get('string', s)))[:60]}\"" + for s in config_strings[:15] + ]) if config_strings else "No config-related strings found" + + registry_display = "\n".join([ + f" {f.get('address', f.get('entry_point', 'N/A'))}: {f.get('name', 'unknown')}" + for f in registry_funcs[:8] + ]) if registry_funcs else "No registry functions found" + + env_display = "\n".join([ + f" {f.get('address', f.get('entry_point', 'N/A'))}: {f.get('name', 'unknown')}" + for f in env_funcs[:5] + ]) if env_funcs else "No environment functions found" + + return { + "prompt": f""" +# Configuration Analysis + +**Binary**: {program_info.get('program_name', 'unknown')} + +## Config-Related Functions +``` +{config_display} +``` + +## Config-Related Strings (File Paths, Keys) +``` +{strings_display} +``` + +## Registry Access (Windows) +``` +{registry_display} +``` + +## Environment Variable Access +``` +{env_display} +``` + +## Analysis Framework + +### 1. Configuration Sources + +**Priority Order (typical):** +``` +1. Command-line arguments (--config=X, -c X) +2. Environment variables ($APP_CONFIG, %APP_CONFIG%) +3. User config file (~/.apprc, %APPDATA%\\app\\config) +4. System config file (/etc/app.conf, %PROGRAMDATA%) +5. Compiled defaults (hardcoded fallbacks) +``` + +### 2. File Format Patterns + +**INI Format:** +```c +// Look for: +GetPrivateProfileString() // Windows API +fgets() + strchr('[') // Manual parsing +sscanf(line, "[%s]", section) +``` + +**JSON Format:** +```c +// Library indicators: +cJSON_Parse(), cJSON_GetObjectItem() // cJSON +json_loads(), json_object_get() // jansson +nlohmann::json // C++ nlohmann +``` + +**XML Format:** +```c +// Library indicators: +xmlReadFile(), xmlDocGetRootElement() // libxml2 +tinyxml2::XMLDocument // TinyXML2 +expat functions (XML_Parse) // Expat +``` + +**Custom Binary:** +```c +// Look for: +fread(&config_struct, sizeof(...)) +Magic number checks at file start +Version field parsing +``` + +### 3. Registry Configuration (Windows) + +**Common Locations:** +``` +HKEY_CURRENT_USER\\Software\\\\ +HKEY_LOCAL_MACHINE\\Software\\\\ +HKEY_LOCAL_MACHINE\\SYSTEM\\CurrentControlSet\\Services\\ +``` + +**Analysis Points:** +- What keys are read vs written? +- Are there fallback values if key missing? +- Is sensitive data stored (credentials, keys)? + +### 4. Environment Variables + +**Common Patterns:** +```c +// Direct usage +char* value = getenv("APP_DEBUG"); +if (value && strcmp(value, "1") == 0) {{ + debug_mode = true; +}} + +// With defaults +char* path = getenv("APP_CONFIG"); +if (!path) path = "/etc/app.conf"; +``` + +**Security Note:** Environment variables can leak to child processes! + +### 5. Configuration Structure Mapping + +Document the config schema: +``` +struct AppConfig {{ + // File locations + char log_path[256]; // from: log_file= + char data_dir[256]; // from: data_directory= + + // Network settings + char server_host[64]; // from: server= + int server_port; // from: port= + + // Feature flags + bool debug_enabled; // from: debug=true/false + int verbosity; // from: verbose=0-3 +}} +``` + +### 6. Default Value Discovery + +**Hardcoded defaults reveal expected values:** +```c +// These strings tell you valid options +if (!config.mode) + config.mode = "production"; // Modes: "production", "debug", "test"? + +if (config.timeout <= 0) + config.timeout = 30; // Default timeout: 30 seconds +``` + +### 7. Config Modification Vectors + +**For security research:** +- Can config file be written by unprivileged user? +- Are file paths validated (path traversal)? +- Is config file integrity verified? +- Can environment variables override secure settings? +- Are sensitive values encrypted at rest? + +### 8. Parsing Vulnerability Patterns + +| Pattern | Risk | Example | +|---------|------|---------| +| Unbounded string copy | Buffer overflow | `strcpy(cfg.name, value)` | +| Integer parsing | Overflow | `atoi()` without bounds | +| Path concatenation | Traversal | `sprintf(path, "%s/%s", dir, file)` | +| Format strings | Code exec | `printf(config_value)` | + +## Recommended Next Steps +- `xrefs_list(name="")` - Find where config is loaded +- `functions_decompile(name="")` - Analyze parsing logic +- `data_list_strings(grep="default|=")` - Find default values +- `structs_list()` - Look for config structure definitions +""", + "context": { + "program_info": program_info, + "config_functions": [f.get('name') for f in unique_config[:15]], + "config_strings": [str(s.get('value', s.get('string', '')))[:50] for s in config_strings[:10]], + "has_registry": len(registry_funcs) > 0, + "has_env": len(env_funcs) > 0 + } + } + +@mcp.prompt("compare_functions") +def compare_functions_prompt(func1_name: str = None, func1_address: str = None, + func2_name: str = None, func2_address: str = None, + port: int = None): + """A prompt to compare two functions for similarity analysis + + Useful for identifying library code, patches, or malware variants. + + Args: + func1_name: First function name (optional if address provided) + func1_address: First function address (optional if name provided) + func2_name: Second function name (optional if address provided) + func2_address: Second function address (optional if name provided) + port: Specific Ghidra instance port (optional) + """ + port = _get_instance_port(port) + + # Get program info + program_info = ghidra_instance(port=port) + + # Get details for function 1 + func1_decompiled = "" + func1_disasm = "" + func1_info = None + func1_vars = [] + + if func1_address: + func1_decompiled = decompiled_function_by_address(address=func1_address, port=port) + func1_disasm = disassembly_by_address(address=func1_address, port=port) + func1_info = function_info_by_address(address=func1_address, port=port) + vars_result = function_variables_by_address(address=func1_address, port=port) + if isinstance(vars_result, dict): + func1_vars = vars_result.get("variables", []) + elif func1_name: + func1_decompiled = decompiled_function_by_name(name=func1_name, port=port) + func1_disasm = disassembly_by_name(name=func1_name, port=port) + func1_info = function_info_by_name(name=func1_name, port=port) + vars_result = function_variables_by_name(name=func1_name, port=port) + if isinstance(vars_result, dict): + func1_vars = vars_result.get("variables", []) + + # Get details for function 2 + func2_decompiled = "" + func2_disasm = "" + func2_info = None + func2_vars = [] + + if func2_address: + func2_decompiled = decompiled_function_by_address(address=func2_address, port=port) + func2_disasm = disassembly_by_address(address=func2_address, port=port) + func2_info = function_info_by_address(address=func2_address, port=port) + vars_result = function_variables_by_address(address=func2_address, port=port) + if isinstance(vars_result, dict): + func2_vars = vars_result.get("variables", []) + elif func2_name: + func2_decompiled = decompiled_function_by_name(name=func2_name, port=port) + func2_disasm = disassembly_by_name(name=func2_name, port=port) + func2_info = function_info_by_name(name=func2_name, port=port) + vars_result = function_variables_by_name(name=func2_name, port=port) + if isinstance(vars_result, dict): + func2_vars = vars_result.get("variables", []) + + # Get function identifiers + func1_id = func1_name or func1_address or "Function 1" + func2_id = func2_name or func2_address or "Function 2" + + if isinstance(func1_info, dict): + func1_id = func1_info.get("name", func1_id) + if isinstance(func2_info, dict): + func2_id = func2_info.get("name", func2_id) + + # Extract basic metrics + func1_lines = len(func1_decompiled.split('\n')) if func1_decompiled else 0 + func2_lines = len(func2_decompiled.split('\n')) if func2_decompiled else 0 + func1_var_count = len(func1_vars) + func2_var_count = len(func2_vars) + + return { + "prompt": f""" +# Function Comparison Analysis + +**Binary**: {program_info.get('program_name', 'unknown')} + +## Function 1: {func1_id} +**Lines**: {func1_lines} | **Variables**: {func1_var_count} + +```c +{func1_decompiled if func1_decompiled else "// Not available"} +``` + +--- + +## Function 2: {func2_id} +**Lines**: {func2_lines} | **Variables**: {func2_var_count} + +```c +{func2_decompiled if func2_decompiled else "// Not available"} +``` + +--- + +## Comparison Framework + +### 1. Structural Similarity Analysis + +**Control Flow Comparison:** +- Compare number of basic blocks +- Compare branching patterns (if/else, switch, loops) +- Compare nesting depth +- Compare cyclomatic complexity + +**Metric Summary:** +| Metric | {func1_id} | {func2_id} | Match | +|--------|------------|------------|-------| +| Line Count | {func1_lines} | {func2_lines} | {'✅' if abs(func1_lines - func2_lines) < 5 else '❌'} | +| Variables | {func1_var_count} | {func2_var_count} | {'✅' if abs(func1_var_count - func2_var_count) < 3 else '❌'} | + +### 2. Semantic Similarity Analysis + +**Look for equivalent operations:** +``` +Same Semantics, Different Code: + a = b + c ≡ a = c + b + if (x == 0) ≡ if (!x) + i++ ≡ i = i + 1 + ptr->field ≡ (*ptr).field +``` + +**Compiler Optimization Differences:** +- Inlining decisions +- Loop unrolling +- Register allocation +- Constant propagation + +### 3. Difference Categories + +| Category | Significance | Example | +|----------|--------------|---------| +| **Cosmetic** | Low | Variable names, whitespace | +| **Optimization** | Low | Compiler choices, register use | +| **Refactoring** | Medium | Code reorganization, extraction | +| **Functional** | High | Different algorithms, new features | +| **Security Patch** | Critical | Bounds checks, validation added | + +### 4. Library Function Identification + +**If functions appear similar to known libraries:** + +Check for signatures of: +- CRT functions (memcpy, strlen, malloc) +- Crypto libraries (AES, SHA implementations) +- Compression (zlib, LZ4) +- Common patterns (linked list ops, hash tables) + +**FLIRT-style matching:** +- First N bytes pattern +- Constant values (magic numbers) +- Call patterns + +### 5. Patch Analysis (If Comparing Versions) + +**Security Patches Often Add:** +```c +// Before (vulnerable) +memcpy(dest, src, len); + +// After (patched) +if (len > sizeof(dest)) return ERROR; // ← Added bounds check +memcpy(dest, src, len); +``` + +**Common Patch Patterns:** +- Added length/bounds validation +- Added NULL pointer checks +- Integer overflow protection +- Changed insecure functions (strcpy → strncpy) + +### 6. Malware Variant Analysis + +**If Comparing Suspected Variants:** + +| Indicator | Meaning | +|-----------|---------| +| Same structure, different strings | Configuration change | +| Same structure, different constants | Key/C2 change | +| Added functions | New capability | +| Removed functions | Slimmed variant | +| Heavy obfuscation changes | Anti-detection update | + +### 7. Comparison Techniques + +**Manual Diff:** +1. Align similar code sections +2. Mark additions in green +3. Mark deletions in red +4. Mark modifications in yellow + +**Automated Approaches:** +- BinDiff / Diaphora (Ghidra plugins) +- Instruction-level hashing +- CFG isomorphism +- Semantic similarity scoring + +### 8. Reporting Template + +``` +Comparison: {func1_id} vs {func2_id} + +Similarity Score: XX% + +Key Differences: +1. [Location] - [Description of change] +2. [Location] - [Description of change] + +Classification: +[ ] Same function (cosmetic differences only) +[ ] Optimized/recompiled version +[ ] Refactored version +[ ] Patched version (security fix) +[ ] Different functionality +[ ] Different function entirely + +Notes: +[Your analysis here] +``` + +## Recommended Next Steps +- `analysis_get_callgraph(address="")` - Compare call patterns +- `xrefs_list(address="")` - Compare usage contexts +- `structs_get(name="")` - Compare data structure usage +- Analyze disassembly for instruction-level differences +""", + "context": { + "program_info": program_info, + "function1": { + "name": func1_id, + "info": func1_info, + "lines": func1_lines, + "variables": func1_var_count + }, + "function2": { + "name": func2_id, + "info": func2_info, + "lines": func2_lines, + "variables": func2_var_count + } + } + } + +@mcp.prompt("document_struct") +def document_struct_prompt(name: str, port: int = None): + """A prompt to comprehensively document a data structure + + Analyzes structure usage across the codebase to determine field purposes. + + Args: + name: Structure name to document + port: Specific Ghidra instance port (optional) + """ + port = _get_instance_port(port) + + # Get program info + program_info = ghidra_instance(port=port) + + # Get the structure definition + struct_info = structs_get(name=name, port=port, page_size=100) + + fields = [] + struct_size = 0 + if isinstance(struct_info, dict): + fields = struct_info.get("fields", struct_info.get("items", [])) + struct_size = struct_info.get("size", 0) + + # Search for functions that reference this struct + struct_funcs = functions_list(port=port, grep=name, page_size=20) + related_funcs = [] + if isinstance(struct_funcs, dict): + related_funcs = struct_funcs.get("functions", struct_funcs.get("items", [])) + + # Search for strings that might relate to field names + # (often debug strings reference struct field names) + field_names = [f.get('name', '') for f in fields if f.get('name')] + related_strings = [] + for field_name in field_names[:5]: + if len(field_name) > 3: # Skip very short names + strings = data_list_strings(port=port, grep=field_name, page_size=3) + if isinstance(strings, dict): + related_strings.extend(strings.get("strings", strings.get("items", []))[:2]) + + # Format structure fields + fields_display = "" + if fields: + max_type_len = max(len(str(f.get('type', ''))) for f in fields) if fields else 10 + max_name_len = max(len(str(f.get('name', ''))) for f in fields) if fields else 10 + + fields_display = "\n".join([ + f" +{f.get('offset', 0):04x} {str(f.get('type', 'unknown')).ljust(max_type_len)} {str(f.get('name', 'field_' + str(i))).ljust(max_name_len)} // {f.get('size', '?')} bytes" + for i, f in enumerate(fields) + ]) + else: + fields_display = " (No fields found)" + + # Format related functions + funcs_display = "\n".join([ + f" {f.get('address', f.get('entry_point', 'N/A'))}: {f.get('name', 'unknown')}" + for f in related_funcs[:15] + ]) if related_funcs else "No related functions found" + + # Format related strings + strings_display = "\n".join([ + f" {s.get('address', 'N/A')}: \"{str(s.get('value', s.get('string', s)))[:50]}\"" + for s in related_strings[:10] + ]) if related_strings else "No related strings found" + + return { + "prompt": f""" +# Structure Documentation: {name} + +**Binary**: {program_info.get('program_name', 'unknown')} +**Structure Size**: {struct_size} bytes (0x{struct_size:x}) + +## Field Layout +``` +{fields_display} +``` + +## Functions Referencing This Structure +``` +{funcs_display} +``` + +## Related Strings +``` +{strings_display} +``` + +## Documentation Framework + +### 1. Structure Purpose Analysis + +**Determine the struct's role:** +- Is it a configuration structure? +- Is it a protocol message/packet? +- Is it an internal state tracker? +- Is it an API/ABI type? +- Is it a file format header? + +### 2. Field Documentation Template + +For each field, document: + +``` +┌─────────────────────────────────────────────────────────────┐ +│ Field: [name] │ +│ Offset: 0x[offset] Size: [bytes] Type: [type] │ +├─────────────────────────────────────────────────────────────┤ +│ Purpose: [What this field represents] │ +│ Valid Values: [Range, enum values, or constraints] │ +│ Set By: [Function(s) that write this field] │ +│ Used By: [Function(s) that read this field] │ +│ Notes: [Special considerations, endianness, etc.] │ +└─────────────────────────────────────────────────────────────┘ +``` + +### 3. Common Field Patterns + +**Identification Fields:** +- Magic numbers (file/protocol signatures) +- Version fields +- Type/opcode discriminators +- Size/length fields + +**Data Fields:** +- Pointers to dynamic data +- Inline arrays/strings +- Numeric values +- Flags/bitfields + +**Linkage Fields:** +- Next/prev pointers (linked lists) +- Parent/child pointers (trees) +- Hash table chains +- Reference counts + +### 4. Bitfield Analysis + +If a field appears to be flags: + +``` +Field: flags (offset 0x10, 4 bytes) + +Bit 0 (0x00000001): INITIALIZED +Bit 1 (0x00000002): CONNECTED +Bit 2 (0x00000004): AUTHENTICATED +Bit 3 (0x00000008): ENCRYPTED +Bits 4-7: Reserved +Bits 8-15: State enum (0-255) +Bits 16-31: Error code +``` + +### 5. Structure Relationship Mapping + +``` + ┌──────────────┐ + │ {name} │ + └──────┬───────┘ + │ + ┌───────────────┼───────────────┐ + ▼ ▼ ▼ + ┌────────────┐ ┌────────────┐ ┌────────────┐ + │ Related 1 │ │ Related 2 │ │ Related 3 │ + └────────────┘ └────────────┘ └────────────┘ +``` + +Document: +- Parent structures (this struct is a field of...) +- Child structures (this struct contains pointers to...) +- Related structures (often used together with...) + +### 6. Memory Layout Visualization + +``` +{name} (0x{struct_size:x} bytes) +┌────────────────────────────────────────┐ 0x0000 +│ │ +│ [field 1] │ +│ │ +├────────────────────────────────────────┤ 0x???? +│ [field 2] │ +├────────────────────────────────────────┤ 0x???? +│ [field 3] │ +│ │ +├────────────────────────────────────────┤ 0x???? +│ ... │ +└────────────────────────────────────────┘ 0x{struct_size:04x} +``` + +### 7. Usage Pattern Analysis + +**Lifecycle:** +1. **Allocation**: How are instances created? +2. **Initialization**: What sets up initial values? +3. **Usage**: How is it passed around and used? +4. **Cleanup**: How is it destroyed/freed? + +**Thread Safety:** +- Is there a mutex/lock field? +- Are accesses atomic? +- Is it passed between threads? + +### 8. Documentation Output Format + +```markdown +## {name} + +**Size**: {struct_size} bytes +**Purpose**: [One-line description] + +### Fields + +| Offset | Type | Name | Description | +|--------|------|------|-------------| +| 0x0000 | uint32 | magic | File signature (0xDEADBEEF) | +| 0x0004 | uint16 | version | Format version (currently 2) | +| ... | ... | ... | ... | + +### Related Functions +- `create_{name}()` - Allocator +- `init_{name}()` - Initializer +- `process_{name}()` - Main handler +- `free_{name}()` - Destructor + +### Notes +[Any special considerations, known issues, etc.] +``` + +## Recommended Next Steps +- `functions_decompile(name="")` - See how fields are used +- `xrefs_list(address="")` - Find all references +- `structs_list()` - Find related structures +- For each field: trace reads and writes to understand purpose +""", + "context": { + "program_info": program_info, + "struct_name": name, + "struct_size": struct_size, + "field_count": len(fields), + "fields": fields[:20], + "related_functions": [f.get('name') for f in related_funcs[:10]] + } + } + +@mcp.prompt("find_error_handlers") +async def find_error_handlers_prompt(port: int = None, ctx: Context = None): + """A prompt to map error handling throughout the binary + + Identifies exception handlers, error paths, logging, and cleanup routines. + Reports progress during multi-category scanning. + + Args: + port: Specific Ghidra instance port (optional) + ctx: FastMCP context for progress reporting (auto-injected) + """ + port = _get_instance_port(port) + + # Get program info (total: 11 error + 6 strings + 6 log + 6 cleanup + 5 exit + 1 info = 35) + await report_step(ctx, 1, 35, "Getting program info") + program_info = ghidra_instance(port=port) + + # Search for error-related function names (11 patterns) + error_patterns = [ + "error", "err", "fail", "exception", "abort", "panic", + "fatal", "die", "exit", "cleanup", "handler" + ] + + error_funcs = [] + for idx, pattern in enumerate(error_patterns, start=2): + await report_step(ctx, idx, 35, f"Scanning error functions: '{pattern}'") + funcs = functions_list(port=port, grep=pattern, grep_ignorecase=True, page_size=10) + if isinstance(funcs, dict): + for f in funcs.get("functions", funcs.get("items", []))[:5]: + f['_pattern'] = pattern + error_funcs.append(f) + + # Deduplicate + seen = set() + unique_error = [] + for f in error_funcs: + addr = f.get("address", f.get("entry_point", str(f))) + if addr not in seen: + seen.add(addr) + unique_error.append(f) + + # Search for error-related strings (6 patterns) + error_strings = [] + string_patterns = [ + "error", "failed", "invalid", "cannot", "unable", + "exception", "warning", "fatal", "critical" + ] + + base_step = 2 + len(error_patterns) # 13 + for idx, pattern in enumerate(string_patterns[:6], start=base_step): + await report_step(ctx, idx, 35, f"Scanning error strings: '{pattern}'") + strings = data_list_strings(port=port, grep=pattern, grep_ignorecase=True, page_size=8) + if isinstance(strings, dict): + for s in strings.get("strings", strings.get("items", []))[:4]: + s['_pattern'] = pattern + error_strings.append(s) + + # Search for logging functions (6 patterns) + log_patterns = ["log", "print", "debug", "trace", "syslog", "fprintf"] + log_funcs = [] + base_step = 13 + 6 # 19 + for idx, pattern in enumerate(log_patterns, start=base_step): + await report_step(ctx, idx, 35, f"Scanning logging: '{pattern}'") + funcs = functions_list(port=port, grep=pattern, grep_ignorecase=True, page_size=5) + if isinstance(funcs, dict): + log_funcs.extend(funcs.get("functions", funcs.get("items", []))[:3]) + + # Search for cleanup/destructor patterns (6 patterns) + cleanup_patterns = ["cleanup", "destroy", "free", "release", "close", "deinit"] + cleanup_funcs = [] + base_step = 19 + 6 # 25 + for idx, pattern in enumerate(cleanup_patterns, start=base_step): + await report_step(ctx, idx, 35, f"Scanning cleanup: '{pattern}'") + funcs = functions_list(port=port, grep=pattern, grep_ignorecase=True, page_size=5) + if isinstance(funcs, dict): + cleanup_funcs.extend(funcs.get("functions", funcs.get("items", []))[:3]) + + # Search for exit/abort functions (5 patterns) + exit_patterns = ["exit", "abort", "_Exit", "quick_exit", "terminate"] + exit_funcs = [] + base_step = 25 + 6 # 31 + for idx, pattern in enumerate(exit_patterns, start=base_step): + await report_step(ctx, idx, 35, f"Scanning exit: '{pattern}'") + funcs = functions_list(port=port, grep=f"^{pattern}$|^_{pattern}$", page_size=3) + if isinstance(funcs, dict): + exit_funcs.extend(funcs.get("functions", funcs.get("items", []))[:2]) + + # Format outputs + error_display = "\n".join([ + f" {f.get('address', f.get('entry_point', 'N/A'))}: {f.get('name', 'unknown')} [{f.get('_pattern', '')}]" + for f in unique_error[:20] + ]) if unique_error else "No error handling functions found" + + strings_display = "\n".join([ + f" {s.get('address', 'N/A')}: \"{str(s.get('value', s.get('string', s)))[:50]}\" [{s.get('_pattern', '')}]" + for s in error_strings[:15] + ]) if error_strings else "No error strings found" + + log_display = "\n".join([ + f" {f.get('address', f.get('entry_point', 'N/A'))}: {f.get('name', 'unknown')}" + for f in log_funcs[:10] + ]) if log_funcs else "No logging functions found" + + cleanup_display = "\n".join([ + f" {f.get('address', f.get('entry_point', 'N/A'))}: {f.get('name', 'unknown')}" + for f in cleanup_funcs[:10] + ]) if cleanup_funcs else "No cleanup functions found" + + exit_display = "\n".join([ + f" {f.get('address', f.get('entry_point', 'N/A'))}: {f.get('name', 'unknown')}" + for f in exit_funcs[:5] + ]) if exit_funcs else "No exit functions found" + + return { + "prompt": f""" +# Error Handling Analysis + +**Binary**: {program_info.get('program_name', 'unknown')} + +## Error Handling Functions +``` +{error_display} +``` + +## Error Messages +``` +{strings_display} +``` + +## Logging Functions +``` +{log_display} +``` + +## Cleanup/Destructor Functions +``` +{cleanup_display} +``` + +## Exit/Abort Functions +``` +{exit_display} +``` + +## Analysis Framework + +### 1. Error Handling Patterns + +**Pattern 1: Return Code Checking** +```c +ret = do_something(); +if (ret < 0) {{ + log_error("do_something failed: %d", ret); + return ret; // Propagate error +}} +``` + +**Pattern 2: Exception-like (goto cleanup)** +```c +int func() {{ + if (!(ptr1 = malloc(...))) goto err1; + if (!(ptr2 = malloc(...))) goto err2; + // ... work ... + return SUCCESS; + +err2: + free(ptr1); +err1: + return ERROR; +}} +``` + +**Pattern 3: C++ Exceptions** +```c +try {{ + riskyOperation(); +}} catch (const std::exception& e) {{ + handleError(e); +}} +``` + +**Pattern 4: Windows SEH** +```c +__try {{ + riskyCode(); +}} __except(EXCEPTION_EXECUTE_HANDLER) {{ + handleException(); +}} +``` + +### 2. Error Propagation Mapping + +``` + Function A + │ + ▼ + ┌─────────────────┐ + │ Function B │◄── Error originates here + └────────┬────────┘ + │ returns ERROR + ▼ + ┌─────────────────┐ + │ Function A │◄── Propagates error + └────────┬────────┘ + │ returns ERROR + ▼ + ┌─────────────────┐ + │ Caller │◄── Handles or propagates + └─────────────────┘ +``` + +### 3. Exception Handler Types + +**Structured Exception Handling (Windows):** +- Look for `__try`/`__except`/`__finally` +- Check for `_except_handler` functions +- Examine exception filter expressions + +**C++ Exception Handling:** +- `__cxa_throw`, `__cxa_begin_catch`, `__cxa_end_catch` +- `.eh_frame` and `.gcc_except_table` sections +- Personality routines (`__gxx_personality_v0`) + +**Signal Handlers (Unix):** +- `signal()`, `sigaction()` setup +- Custom handlers for SIGSEGV, SIGBUS, etc. + +### 4. Error Code Analysis + +**Document the error code scheme:** +``` +Error Code Ranges: + 0 = Success + 1-99 = General errors + 100-199 = File errors + 200-299 = Network errors + 300-399 = Authentication errors + 400-499 = Permission errors + -1 = Generic failure +``` + +**Common Conventions:** +| Convention | Success | Failure | +|------------|---------|---------| +| Unix style | 0 | -1 or negative | +| Boolean | 1/true | 0/false | +| HRESULT | >= 0 | < 0 | +| errno-based | 0 | errno set | + +### 5. Cleanup Path Analysis + +**Resource Cleanup Checklist:** +- [ ] All malloc'd memory freed +- [ ] All file handles closed +- [ ] All sockets closed +- [ ] All mutexes released +- [ ] All threads joined +- [ ] All temp files removed + +**RAII-style (C++):** +```cpp +// Destructor handles cleanup automatically +unique_ptr res = make_unique(); +``` + +**Manual cleanup (C):** +```c +// Must explicitly free on every exit path +if (error) {{ + free(buffer); + close(fd); + return -1; +}} +``` + +### 6. Logging Analysis + +**Log Levels:** +``` +TRACE - Detailed debugging +DEBUG - Development info +INFO - Normal operation +WARNING - Potential issues +ERROR - Failures (recoverable) +FATAL - Unrecoverable (exit) +``` + +**Useful Information in Logs:** +- Error messages reveal expected conditions +- Debug strings reveal internal state +- Trace messages reveal execution flow +- Format strings reveal data structures + +### 7. Security Implications + +**Error Handling Vulnerabilities:** + +| Issue | Risk | Example | +|-------|------|---------| +| Missing error check | High | Use after failed malloc | +| Error info disclosure | Medium | Stack traces to user | +| Inconsistent cleanup | Medium | Memory leaks, resource exhaustion | +| Error-based oracle | Low | Different errors reveal state | + +### 8. Documentation Output + +``` +Error Handling Map for {program_info.get('program_name', 'unknown')} + +Central Error Handlers: +- handle_error() @ 0x401000 - Main error router +- panic() @ 0x402000 - Fatal error handler + +Error Propagation: + network_read() → connection_handler() → main_loop() + file_parse() → load_config() → init() + +Cleanup Routines: +- cleanup_connection() - Closes sockets, frees buffers +- cleanup_session() - Destroys session state + +Exit Codes: + 0 - Success + 1 - Configuration error + 2 - Network error + 3 - Authentication failure +``` + +## Recommended Next Steps +- `xrefs_list(address="")` - Find error check locations +- `functions_decompile(name="")` - Analyze error processing +- `analysis_get_callgraph(name="")` - Map cleanup flow +- Look for functions with many callees to `exit()` or `abort()` +""", + "context": { + "program_info": program_info, + "error_functions": [f.get('name') for f in unique_error[:15]], + "error_strings": [str(s.get('value', s.get('string', '')))[:40] for s in error_strings[:10]], + "log_functions": [f.get('name') for f in log_funcs[:10]], + "cleanup_functions": [f.get('name') for f in cleanup_funcs[:10]], + "exit_functions": [f.get('name') for f in exit_funcs[:5]] } }