# /// script # requires-python = ">=3.11" # dependencies = [ # "mcp>=1.22.0", # "requests>=2.32.3", # ] # /// # GhydraMCP Bridge for Ghidra HATEOAS API - Optimized for MCP integration # Provides namespaced tools for interacting with Ghidra's reverse engineering capabilities # Features: Cursor-based pagination, grep filtering, session isolation import os import signal import sys import threading import time from threading import Lock from typing import Dict, List, Optional, Union, Any from urllib.parse import quote, urlencode, urlparse import requests from mcp.server.fastmcp import FastMCP, Context # ================= Core Infrastructure ================= ALLOWED_ORIGINS = os.environ.get( "GHIDRA_ALLOWED_ORIGINS", "http://localhost").split(",") active_instances: Dict[int, dict] = {} instances_lock = Lock() DEFAULT_GHIDRA_PORT = 8192 DEFAULT_GHIDRA_HOST = "localhost" QUICK_DISCOVERY_RANGE = range(DEFAULT_GHIDRA_PORT, DEFAULT_GHIDRA_PORT+10) FULL_DISCOVERY_RANGE = range(DEFAULT_GHIDRA_PORT, DEFAULT_GHIDRA_PORT+20) BRIDGE_VERSION = "2025-12-01" REQUIRED_API_VERSION = 2010 current_instance_port = DEFAULT_GHIDRA_PORT # ================= Cursor-Based Pagination System ================= # Provides efficient pagination with grep filtering for large responses # Inspired by mcplaywright pagination system import re import hashlib import json from dataclasses import dataclass, field from typing import Callable, Iterator from collections import OrderedDict # Configuration CURSOR_TTL_SECONDS = 300 # 5 minutes CURSOR_MAX_CACHE_SIZE = 100 # Maximum number of cached cursors DEFAULT_PAGE_SIZE = 50 MAX_PAGE_SIZE = 500 TOKEN_ESTIMATION_RATIO = 4.0 # Roughly 4 chars per token # ReDoS Protection Configuration MAX_GREP_PATTERN_LENGTH = 500 # Maximum regex pattern length MAX_GREP_REPETITION_OPS = 15 # Maximum repetition operators (* + ? {}) MAX_GREP_RECURSION_DEPTH = 10 # Maximum depth for nested data grep matching def compile_safe_pattern(pattern: str, flags: int = 0) -> re.Pattern: """Compile regex pattern with ReDoS protection Validates pattern to prevent catastrophic backtracking attacks. Rejects patterns that are too long or have excessive repetition operators. Args: pattern: Regex pattern string flags: Regex compilation flags Returns: Compiled regex pattern Raises: ValueError: If pattern fails safety validation """ if not pattern: raise ValueError("Empty pattern") # Check pattern length if len(pattern) > MAX_GREP_PATTERN_LENGTH: raise ValueError( f"Pattern too long ({len(pattern)} chars, max {MAX_GREP_PATTERN_LENGTH}). " "Consider using a simpler pattern or substring match." ) # Count repetition operators that could cause catastrophic backtracking # These are the main culprits: nested quantifiers like (a+)+, (a*)* repetition_ops = pattern.count('*') + pattern.count('+') + pattern.count('?') # Also count bounded repetitions {n,m} repetition_ops += len(re.findall(r'\{[0-9,]+\}', pattern)) if repetition_ops > MAX_GREP_REPETITION_OPS: raise ValueError( f"Pattern has too many repetition operators ({repetition_ops}, max {MAX_GREP_REPETITION_OPS}). " "This could cause performance issues. Consider simplifying the pattern." ) # Check for common dangerous patterns (nested quantifiers) dangerous_patterns = [ r'\([^)]*[*+][^)]*\)[*+]', # (a+)+ or (a*)* r'\([^)]*[*+][^)]*\)\{', # (a+){n,m} ] for dangerous in dangerous_patterns: if re.search(dangerous, pattern): raise ValueError( "Pattern contains nested quantifiers which could cause exponential backtracking. " "Example: (a+)+ is dangerous. Consider using atomic groups or simplifying." ) # Try to compile the pattern try: return re.compile(pattern, flags) except re.error as e: raise ValueError(f"Invalid regex pattern: {e}") @dataclass class CursorState: """Represents the state of a paginated query with session isolation""" cursor_id: str # Unique cursor identifier session_id: str # Session isolation key tool_name: str # Tool that created this cursor query_hash: str # Hash of original query parameters data: List[Any] # Full result set (or filtered) total_count: int # Total items before pagination filtered_count: int # Items after grep filtering current_offset: int = 0 page_size: int = DEFAULT_PAGE_SIZE grep_pattern: str = None grep_flags: int = 0 created_at: float = field(default_factory=time.time) last_accessed: float = field(default_factory=time.time) @property def is_expired(self) -> bool: return time.time() - self.last_accessed > CURSOR_TTL_SECONDS @property def has_more(self) -> bool: return self.current_offset + self.page_size < self.filtered_count @property def current_page(self) -> int: return (self.current_offset // self.page_size) + 1 @property def total_pages(self) -> int: return max(1, (self.filtered_count + self.page_size - 1) // self.page_size) @property def ttl_remaining(self) -> int: return max(0, int(CURSOR_TTL_SECONDS - (time.time() - self.last_accessed))) def verify_session(self, session_id: str) -> bool: """Verify cursor belongs to requesting session""" return self.session_id == session_id class CursorManager: """Thread-safe cursor manager with TTL-based expiration and session isolation""" def __init__(self): self._cursors: OrderedDict[str, CursorState] = OrderedDict() self._session_cursors: Dict[str, set] = {} # session_id -> set of cursor_ids self._lock = Lock() def _generate_cursor_id(self, query_hash: str, session_id: str) -> str: """Generate a unique cursor ID""" unique = f"{session_id}-{query_hash}-{time.time()}-{id(self)}" return hashlib.sha256(unique.encode()).hexdigest()[:16] def _cleanup_expired(self): """Remove expired cursors (call while holding lock)""" expired = [cid for cid, state in self._cursors.items() if state.is_expired] for cid in expired: state = self._cursors[cid] # Remove from session tracking if state.session_id in self._session_cursors: self._session_cursors[state.session_id].discard(cid) del self._cursors[cid] # Also enforce max cache size (LRU eviction) while len(self._cursors) > CURSOR_MAX_CACHE_SIZE: oldest_id, oldest_state = self._cursors.popitem(last=False) if oldest_state.session_id in self._session_cursors: self._session_cursors[oldest_state.session_id].discard(oldest_id) def create_cursor(self, data: List[Any], query_params: dict, tool_name: str = "unknown", session_id: str = "default", grep_pattern: str = None, grep_flags: int = 0, page_size: int = DEFAULT_PAGE_SIZE) -> tuple[str, CursorState]: """Create a new cursor for paginated results Args: data: The full result set to paginate query_params: Original query parameters (for hashing) tool_name: Name of tool creating cursor session_id: Session identifier for isolation grep_pattern: Optional regex pattern to filter results grep_flags: Regex flags (re.IGNORECASE, etc.) page_size: Items per page Returns: Tuple of (cursor_id, cursor_state) """ # Apply grep filtering if pattern provided (with ReDoS protection) filtered_data = data if grep_pattern: pattern = compile_safe_pattern(grep_pattern, grep_flags) filtered_data = [ item for item in data if self._matches_grep(item, pattern) ] # Create query hash for deduplication query_hash = hashlib.md5( json.dumps(query_params, sort_keys=True, default=str).encode() ).hexdigest()[:12] with self._lock: self._cleanup_expired() cursor_id = self._generate_cursor_id(query_hash, session_id) state = CursorState( cursor_id=cursor_id, session_id=session_id, tool_name=tool_name, query_hash=query_hash, data=filtered_data, total_count=len(data), filtered_count=len(filtered_data), page_size=min(page_size, MAX_PAGE_SIZE), grep_pattern=grep_pattern, grep_flags=grep_flags ) self._cursors[cursor_id] = state # Track by session if session_id not in self._session_cursors: self._session_cursors[session_id] = set() self._session_cursors[session_id].add(cursor_id) return cursor_id, state def get_cursor(self, cursor_id: str, session_id: str = None) -> Optional[CursorState]: """Retrieve a cursor by ID, optionally validating session Args: cursor_id: The cursor identifier session_id: Optional session to validate against Returns: CursorState if found and valid, None otherwise """ with self._lock: self._cleanup_expired() if cursor_id not in self._cursors: return None state = self._cursors[cursor_id] if state.is_expired: del self._cursors[cursor_id] if state.session_id in self._session_cursors: self._session_cursors[state.session_id].discard(cursor_id) return None # Validate session if provided if session_id and not state.verify_session(session_id): return None state.last_accessed = time.time() # Move to end (most recently used) self._cursors.move_to_end(cursor_id) return state def advance_cursor(self, cursor_id: str, session_id: str = None) -> Optional[CursorState]: """Advance cursor to next page Args: cursor_id: The cursor identifier session_id: Optional session to validate against Returns: Updated CursorState or None if invalid/expired """ with self._lock: state = self._cursors.get(cursor_id) if not state or state.is_expired: return None if session_id and not state.verify_session(session_id): return None state.current_offset += state.page_size state.last_accessed = time.time() self._cursors.move_to_end(cursor_id) return state def delete_cursor(self, cursor_id: str, session_id: str = None) -> bool: """Explicitly delete a cursor Args: cursor_id: The cursor identifier session_id: Optional session to validate against Returns: True if deleted, False if not found or session mismatch """ with self._lock: if cursor_id not in self._cursors: return False state = self._cursors[cursor_id] if session_id and not state.verify_session(session_id): return False if state.session_id in self._session_cursors: self._session_cursors[state.session_id].discard(cursor_id) del self._cursors[cursor_id] return True def delete_session_cursors(self, session_id: str) -> int: """Delete all cursors for a session Args: session_id: The session identifier Returns: Number of cursors deleted """ with self._lock: if session_id not in self._session_cursors: return 0 cursor_ids = list(self._session_cursors[session_id]) count = 0 for cid in cursor_ids: if cid in self._cursors: del self._cursors[cid] count += 1 del self._session_cursors[session_id] return count def get_page(self, state: CursorState) -> List[Any]: """Get current page of data from cursor state""" start = state.current_offset end = start + state.page_size return state.data[start:end] def _matches_grep(self, item: Any, pattern: re.Pattern, depth: int = 0) -> bool: """Check if an item matches the grep pattern Searches through string representations of dict values, list items, or the item itself. Args: item: The item to search pattern: Compiled regex pattern depth: Current recursion depth (for stack overflow protection) Returns: True if pattern matches anywhere in the item """ # Prevent stack overflow from deeply nested structures if depth > MAX_GREP_RECURSION_DEPTH: return False if isinstance(item, dict): # Search all string values in the dict (recursively) for key, value in item.items(): if isinstance(value, str) and pattern.search(value): return True elif isinstance(value, (int, float)): if pattern.search(str(value)): return True elif isinstance(value, dict): if self._matches_grep(value, pattern, depth + 1): return True elif isinstance(value, (list, tuple)): if self._matches_grep(value, pattern, depth + 1): return True return False elif isinstance(item, (list, tuple)): return any(self._matches_grep(i, pattern, depth + 1) for i in item) elif isinstance(item, str): return bool(pattern.search(item)) else: return bool(pattern.search(str(item))) def list_cursors(self, session_id: str = None) -> List[dict]: """List active cursors, optionally filtered by session Args: session_id: Optional session filter Returns: List of cursor info dicts """ with self._lock: self._cleanup_expired() return [ { "cursor_id": cid, "session_id": state.session_id, "tool_name": state.tool_name, "total_count": state.total_count, "filtered_count": state.filtered_count, "current_page": state.current_page, "total_pages": state.total_pages, "current_offset": state.current_offset, "page_size": state.page_size, "has_more": state.has_more, "grep_pattern": state.grep_pattern, "age_seconds": int(time.time() - state.created_at), "ttl_remaining": state.ttl_remaining } for cid, state in self._cursors.items() if session_id is None or state.session_id == session_id ] def get_stats(self) -> dict: """Get cursor manager statistics""" with self._lock: self._cleanup_expired() return { "total_cursors": len(self._cursors), "total_sessions": len(self._session_cursors), "max_cache_size": CURSOR_MAX_CACHE_SIZE, "ttl_seconds": CURSOR_TTL_SECONDS, "cursors_per_session": { sid: len(cids) for sid, cids in self._session_cursors.items() } } # Global cursor manager instance cursor_manager = CursorManager() def estimate_tokens(data: List[Any]) -> int: """Estimate token count for a list of items""" text = json.dumps(data, default=str) return int(len(text) / TOKEN_ESTIMATION_RATIO) def paginate_response(data: List[Any], query_params: dict, tool_name: str = "unknown", session_id: str = "default", page_size: int = DEFAULT_PAGE_SIZE, grep: str = None, grep_ignorecase: bool = True, return_all: bool = False) -> dict: """Create a paginated response with optional grep filtering Args: data: Full result list to paginate query_params: Original query parameters (for cursor creation) tool_name: Name of the tool creating this response session_id: Session identifier for cursor isolation page_size: Items per page (default: 50, max: 500) grep: Optional regex pattern to filter results grep_ignorecase: Case-insensitive grep (default: True) return_all: Bypass pagination and return all results (with warning) Returns: dict with pagination metadata and results """ grep_flags = re.IGNORECASE if grep_ignorecase else 0 # Handle return_all bypass if return_all: # Apply grep filtering even for return_all filtered_data = data if grep: try: pattern = compile_safe_pattern(grep, grep_flags) filtered_data = [ item for item in data if cursor_manager._matches_grep(item, pattern) ] except ValueError as e: return { "success": False, "error": { "code": "INVALID_GREP_PATTERN", "message": str(e) }, "timestamp": int(time.time() * 1000) } estimated_tokens = estimate_tokens(filtered_data) warning = None if estimated_tokens > 50000: warning = f"🚨 EXTREMELY LARGE response (~{estimated_tokens:,} tokens) - may cause issues" elif estimated_tokens > 20000: warning = f"⚠️ VERY LARGE response (~{estimated_tokens:,} tokens) - consider using pagination" elif estimated_tokens > 8000: warning = f"⚠️ Large response (~{estimated_tokens:,} tokens)" return { "success": True, "result": filtered_data, "pagination": { "bypassed": True, "total_count": len(data), "filtered_count": len(filtered_data), "grep_pattern": grep, "estimated_tokens": estimated_tokens, "warning": warning }, "timestamp": int(time.time() * 1000) } # Normal pagination flow try: cursor_id, state = cursor_manager.create_cursor( data=data, query_params=query_params, tool_name=tool_name, session_id=session_id, grep_pattern=grep, grep_flags=grep_flags, page_size=page_size ) except ValueError as e: return { "success": False, "error": { "code": "INVALID_GREP_PATTERN", "message": str(e) }, "timestamp": int(time.time() * 1000) } current_page = cursor_manager.get_page(state) # Only include cursor_id if there are more pages response_cursor = cursor_id if state.has_more else None # Build response with prominent continuation message for LLMs response = { "success": True, "result": current_page, "pagination": { "cursor_id": response_cursor, "session_id": session_id, "total_count": state.total_count, "filtered_count": state.filtered_count, "page_size": state.page_size, "current_page": state.current_page, "total_pages": state.total_pages, "has_more": state.has_more, "grep_pattern": grep, "items_returned": len(current_page), }, "timestamp": int(time.time() * 1000) } # Add prominent message for LLMs when more data is available if state.has_more: remaining = state.filtered_count - (state.current_page * state.page_size) response["_message"] = ( f"📄 Showing {len(current_page)} of {state.filtered_count} items " f"(page {state.current_page}/{state.total_pages}). " f"To get the next {min(state.page_size, remaining)} items, call: " f"cursor_next(cursor_id='{cursor_id}')" ) else: response["_message"] = f"✅ Complete: {len(current_page)} items returned (all results)" return response # ================= End Cursor System ================= instructions = """ GhydraMCP allows interacting with multiple Ghidra SRE instances. Ghidra SRE is a tool for reverse engineering and analyzing binaries, e.g. malware. First, run `instances_list()` to see all available Ghidra instances (automatically discovers instances on the default host). Then use `instances_use(port)` to set your working instance. Note: Use `instances_discover(host)` only if you need to scan a different host. The API is organized into namespaces for different types of operations: - instances_* : For managing Ghidra instances - functions_* : For working with functions - data_* : For working with data items - structs_* : For creating and managing struct data types - memory_* : For memory access - xrefs_* : For cross-references - analysis_* : For program analysis - cursor_* : For pagination cursor management ## Pagination System The following tools support cursor-based pagination with grep filtering: - `functions_list` - List functions (can be 10K+) - `functions_decompile` - Decompiled code lines (grep for patterns like "if.*NULL") - `functions_disassemble` - Assembly instructions (grep for "CALL", "JMP", etc.) - `functions_get_variables` - Function variables (grep for "local_", "param", etc.) - `data_list` - List data items - `data_list_strings` - List string data - `xrefs_list` - List cross-references (can be very large for common functions) - `structs_list` - List struct types - `structs_get` - Struct fields (grep for field names/types in large structs) - `analysis_get_callgraph` - Call graph edges (grep for function names) - `analysis_get_dataflow` - Data flow steps (grep for opcodes/registers) Pagination parameters: - `page_size`: Items per page (default: 50, max: 500) - `grep`: Regex pattern to filter results (e.g., "main|init", "FUN_00.*") - `grep_ignorecase`: Case-insensitive grep (default: True) - `return_all`: Bypass pagination and return all results (use with caution) When results are paginated, the response includes a `_message` field with instructions. Use `cursor_next(cursor_id)` to fetch the next page of results. Use `cursor_list()` to see active cursors. Use `cursor_delete(cursor_id)` to clean up cursors. """ mcp = FastMCP("GhydraMCP", instructions=instructions) ghidra_host = os.environ.get("GHIDRA_HYDRA_HOST", DEFAULT_GHIDRA_HOST) # Helper function to get the current instance or validate a specific port def _get_instance_port(port=None): """Internal helper to get the current instance port or validate a specific port""" port = port or current_instance_port # Validate that the instance exists and is active if port not in active_instances: # Try to register it if not found register_instance(port) if port not in active_instances: raise ValueError(f"No active Ghidra instance on port {port}") return port # The rest of the utility functions (HTTP helpers, etc.) remain the same... def get_instance_url(port: int) -> str: """Get URL for a Ghidra instance by port""" with instances_lock: if port in active_instances: return active_instances[port]["url"] if 8192 <= port <= 65535: register_instance(port) if port in active_instances: return active_instances[port]["url"] return f"http://{ghidra_host}:{port}" def validate_origin(headers: dict) -> bool: """Validate request origin against allowed origins""" origin = headers.get("Origin") if not origin: # No origin header - allow (browser same-origin policy applies) return True # Parse origin to get scheme+hostname try: parsed = urlparse(origin) origin_base = f"{parsed.scheme}://{parsed.hostname}" if parsed.port: origin_base += f":{parsed.port}" except: return False return origin_base in ALLOWED_ORIGINS def _make_request(method: str, port: int, endpoint: str, params: dict = None, json_data: dict = None, data: str = None, headers: dict = None) -> dict: """Internal helper to make HTTP requests and handle common errors.""" url = f"{get_instance_url(port)}/{endpoint}" # Set up headers according to HATEOAS API expected format request_headers = { 'Accept': 'application/json', 'X-Request-ID': f"mcp-bridge-{int(time.time() * 1000)}" } if headers: request_headers.update(headers) is_state_changing = method.upper() in ["POST", "PUT", "PATCH", "DELETE"] if is_state_changing: check_headers = json_data.get("headers", {}) if isinstance( json_data, dict) else (headers or {}) if not validate_origin(check_headers): return { "success": False, "error": { "code": "ORIGIN_NOT_ALLOWED", "message": "Origin not allowed for state-changing request" }, "status_code": 403, "timestamp": int(time.time() * 1000) } if json_data is not None: request_headers['Content-Type'] = 'application/json' elif data is not None: request_headers['Content-Type'] = 'text/plain' try: response = requests.request( method, url, params=params, json=json_data, data=data, headers=request_headers, timeout=10 ) try: parsed_json = response.json() # Add timestamp if not present if isinstance(parsed_json, dict) and "timestamp" not in parsed_json: parsed_json["timestamp"] = int(time.time() * 1000) # Check for HATEOAS compliant error response format and reformat if needed if not response.ok and isinstance(parsed_json, dict) and "success" in parsed_json and not parsed_json["success"]: # Check if error is in the expected HATEOAS format if "error" in parsed_json and not isinstance(parsed_json["error"], dict): # Convert string error to the proper format error_message = parsed_json["error"] parsed_json["error"] = { "code": f"HTTP_{response.status_code}", "message": error_message } return parsed_json except ValueError: if response.ok: return { "success": False, "error": { "code": "NON_JSON_RESPONSE", "message": "Received non-JSON success response from Ghidra plugin" }, "status_code": response.status_code, "response_text": response.text[:500], "timestamp": int(time.time() * 1000) } else: return { "success": False, "error": { "code": f"HTTP_{response.status_code}", "message": f"Non-JSON error response: {response.text[:100]}..." }, "status_code": response.status_code, "response_text": response.text[:500], "timestamp": int(time.time() * 1000) } except requests.exceptions.Timeout: return { "success": False, "error": { "code": "REQUEST_TIMEOUT", "message": "Request timed out" }, "status_code": 408, "timestamp": int(time.time() * 1000) } except requests.exceptions.ConnectionError: return { "success": False, "error": { "code": "CONNECTION_ERROR", "message": f"Failed to connect to Ghidra instance at {url}" }, "status_code": 503, "timestamp": int(time.time() * 1000) } except Exception as e: return { "success": False, "error": { "code": "UNEXPECTED_ERROR", "message": f"An unexpected error occurred: {str(e)}" }, "exception": e.__class__.__name__, "timestamp": int(time.time() * 1000) } def safe_get(port: int, endpoint: str, params: dict = None) -> dict: """Make GET request to Ghidra instance""" return _make_request("GET", port, endpoint, params=params) def safe_put(port: int, endpoint: str, data: dict) -> dict: """Make PUT request to Ghidra instance with JSON payload""" headers = data.pop("headers", None) if isinstance(data, dict) else None return _make_request("PUT", port, endpoint, json_data=data, headers=headers) def safe_post(port: int, endpoint: str, data: Union[dict, str]) -> dict: """Perform a POST request to a specific Ghidra instance with JSON or text payload""" headers = None json_payload = None text_payload = None if isinstance(data, dict): headers = data.pop("headers", None) json_payload = data else: text_payload = data return _make_request("POST", port, endpoint, json_data=json_payload, data=text_payload, headers=headers) def safe_patch(port: int, endpoint: str, data: dict) -> dict: """Perform a PATCH request to a specific Ghidra instance with JSON payload""" headers = data.pop("headers", None) if isinstance(data, dict) else None return _make_request("PATCH", port, endpoint, json_data=data, headers=headers) def safe_delete(port: int, endpoint: str) -> dict: """Perform a DELETE request to a specific Ghidra instance""" return _make_request("DELETE", port, endpoint) def simplify_response(response: dict) -> dict: """ Simplify HATEOAS response data for easier AI agent consumption - Removes _links from result entries - Flattens nested structures when appropriate - Preserves important metadata - Converts structured data like disassembly to text for easier consumption """ if not isinstance(response, dict): return response # Make a copy to avoid modifying the original result = response.copy() # Store API response metadata api_metadata = {} for key in ["id", "instance", "timestamp", "size", "offset", "limit"]: if key in result: api_metadata[key] = result.get(key) # Simplify the main result data if present if "result" in result: # Handle array results if isinstance(result["result"], list): simplified_items = [] for item in result["result"]: if isinstance(item, dict): # Store but remove HATEOAS links from individual items item_copy = item.copy() links = item_copy.pop("_links", None) # Optionally store direct href links as more accessible properties # This helps AI agents navigate the API without understanding HATEOAS if isinstance(links, dict): for link_name, link_data in links.items(): if isinstance(link_data, dict) and "href" in link_data: item_copy[f"{link_name}_url"] = link_data["href"] simplified_items.append(item_copy) else: simplified_items.append(item) result["result"] = simplified_items # Handle object results elif isinstance(result["result"], dict): result_copy = result["result"].copy() # Store but remove links from result object links = result_copy.pop("_links", None) # Add direct href links for easier navigation if isinstance(links, dict): for link_name, link_data in links.items(): if isinstance(link_data, dict) and "href" in link_data: result_copy[f"{link_name}_url"] = link_data["href"] # Special case for disassembly - convert to text for easier consumption if "instructions" in result_copy and isinstance(result_copy["instructions"], list): disasm_text = "" for instr in result_copy["instructions"]: if isinstance(instr, dict): addr = instr.get("address", "") mnemonic = instr.get("mnemonic", "") operands = instr.get("operands", "") bytes_str = instr.get("bytes", "") # Format: address: bytes mnemonic operands disasm_text += f"{addr}: {bytes_str.ljust(10)} {mnemonic} {operands}\n" # Add the text representation result_copy["disassembly_text"] = disasm_text # Remove the original structured instructions to simplify the response result_copy.pop("instructions", None) # Special case for decompiled code - make sure it's directly accessible if "ccode" in result_copy: result_copy["decompiled_text"] = result_copy["ccode"] elif "decompiled" in result_copy: result_copy["decompiled_text"] = result_copy["decompiled"] result["result"] = result_copy # Store but remove HATEOAS links from the top level links = result.pop("_links", None) # Add direct href links in a more accessible format if isinstance(links, dict): api_links = {} for link_name, link_data in links.items(): if isinstance(link_data, dict) and "href" in link_data: api_links[link_name] = link_data["href"] # Add simplified links if api_links: result["api_links"] = api_links # Restore API metadata for key, value in api_metadata.items(): if key not in result: result[key] = value return result def register_instance(port: int, url: str = None) -> str: """Register a new Ghidra instance Args: port: Port number of the Ghidra instance url: Optional URL if different from default http://host:port Returns: str: Confirmation message or error """ if url is None: url = f"http://{ghidra_host}:{port}" try: # Check for HATEOAS API by checking plugin-version endpoint test_url = f"{url}/plugin-version" response = requests.get(test_url, timeout=2) if not response.ok: return f"Error: Instance at {url} is not responding properly to HATEOAS API" project_info = {"url": url} try: # Check plugin version to ensure compatibility try: version_data = response.json() if "result" in version_data: result = version_data["result"] if isinstance(result, dict): plugin_version = result.get("plugin_version", "") api_version = result.get("api_version", 0) project_info["plugin_version"] = plugin_version project_info["api_version"] = api_version # Verify API version compatibility if api_version != REQUIRED_API_VERSION: error_msg = f"API version mismatch: Plugin reports version {api_version}, but bridge requires version {REQUIRED_API_VERSION}" print(error_msg, file=sys.stderr) return error_msg print(f"Connected to Ghidra plugin version {plugin_version} with API version {api_version}") except Exception as e: print(f"Error parsing plugin version: {e}", file=sys.stderr) # Get program info from HATEOAS API info_url = f"{url}/program" try: info_response = requests.get(info_url, timeout=2) if info_response.ok: try: info_data = info_response.json() if "result" in info_data: result = info_data["result"] if isinstance(result, dict): # Extract project and file from programId (format: "project:/file") program_id = result.get("programId", "") if ":" in program_id: project_name, file_path = program_id.split(":", 1) project_info["project"] = project_name # Remove leading slash from file path if present if file_path.startswith("/"): file_path = file_path[1:] project_info["path"] = file_path # Get file name directly from the result project_info["file"] = result.get("name", "") # Get other metadata project_info["language_id"] = result.get("languageId", "") project_info["compiler_spec_id"] = result.get("compilerSpecId", "") project_info["image_base"] = result.get("image_base", "") # Store _links from result for HATEOAS navigation if "_links" in result: project_info["_links"] = result.get("_links", {}) except Exception as e: print(f"Error parsing info endpoint: {e}", file=sys.stderr) except Exception as e: print(f"Error connecting to info endpoint: {e}", file=sys.stderr) except Exception: # Non-critical, continue with registration even if project info fails pass with instances_lock: active_instances[port] = project_info return f"Registered instance on port {port} at {url}" except Exception as e: return f"Error: Could not connect to instance at {url}: {str(e)}" def _discover_instances(port_range, host=None, timeout=0.5) -> dict: """Internal function to discover NEW Ghidra instances by scanning ports This function only returns newly discovered instances that weren't already in the active_instances registry. Use instances_discover() for a complete list including already known instances. """ found_instances = [] scan_host = host if host is not None else ghidra_host for port in port_range: if port in active_instances: continue # Skip already known instances url = f"http://{scan_host}:{port}" try: # Try HATEOAS API via plugin-version endpoint test_url = f"{url}/plugin-version" response = requests.get(test_url, headers={'Accept': 'application/json', 'X-Request-ID': f"discovery-{int(time.time() * 1000)}"}, timeout=timeout) if response.ok: # Further validate it's a GhydraMCP instance by checking response format try: json_data = response.json() if "success" in json_data and json_data["success"] and "result" in json_data: # Looks like a valid HATEOAS API response # Instead of relying only on register_instance, which already checks program info, # extract additional information here for more detailed discovery results result = register_instance(port, url) # Initialize report info instance_info = { "port": port, "url": url } # Extract version info for reporting if isinstance(json_data["result"], dict): instance_info["plugin_version"] = json_data["result"].get("plugin_version", "unknown") instance_info["api_version"] = json_data["result"].get("api_version", "unknown") else: instance_info["plugin_version"] = "unknown" instance_info["api_version"] = "unknown" # Include project details from registered instance in the report if port in active_instances: instance_info["project"] = active_instances[port].get("project", "") instance_info["file"] = active_instances[port].get("file", "") instance_info["result"] = result found_instances.append(instance_info) except (ValueError, KeyError): # Not a valid JSON response or missing expected keys print(f"Port {port} returned non-HATEOAS response", file=sys.stderr) continue except requests.exceptions.RequestException: # Instance not available, just continue continue return { "found": len(found_instances), "instances": found_instances } def periodic_discovery(): """Periodically discover new instances""" while True: try: _discover_instances(FULL_DISCOVERY_RANGE, timeout=0.5) with instances_lock: ports_to_remove = [] for port, info in active_instances.items(): url = info["url"] try: # Check HATEOAS API via plugin-version endpoint response = requests.get(f"{url}/plugin-version", timeout=1) if not response.ok: ports_to_remove.append(port) continue # Update program info if available (especially to get project name) try: info_url = f"{url}/program" info_response = requests.get(info_url, timeout=1) if info_response.ok: try: info_data = info_response.json() if "result" in info_data: result = info_data["result"] if isinstance(result, dict): # Extract project and file from programId (format: "project:/file") program_id = result.get("programId", "") if ":" in program_id: project_name, file_path = program_id.split(":", 1) info["project"] = project_name # Remove leading slash from file path if present if file_path.startswith("/"): file_path = file_path[1:] info["path"] = file_path # Get file name directly from the result info["file"] = result.get("name", "") # Get other metadata info["language_id"] = result.get("languageId", "") info["compiler_spec_id"] = result.get("compilerSpecId", "") info["image_base"] = result.get("image_base", "") except Exception as e: print(f"Error parsing info endpoint during discovery: {e}", file=sys.stderr) except Exception: # Non-critical, continue even if update fails pass except requests.exceptions.RequestException: ports_to_remove.append(port) for port in ports_to_remove: del active_instances[port] print(f"Removed unreachable instance on port {port}") except Exception as e: print(f"Error in periodic discovery: {e}") time.sleep(30) def handle_sigint(signum, frame): os._exit(0) # ================= MCP Resources ================= # Resources provide information that can be loaded directly into context # They focus on data and minimize metadata @mcp.resource(uri="/instance/{port}") def ghidra_instance(port: int = None) -> dict: """Get detailed information about a Ghidra instance and the loaded program Args: port: Specific Ghidra instance port (optional, uses current if omitted) Returns: dict: Detailed information about the Ghidra instance and loaded program """ port = _get_instance_port(port) response = safe_get(port, "program") if not isinstance(response, dict) or not response.get("success", False): return {"error": f"Unable to access Ghidra instance on port {port}"} # Extract only the most relevant information for the resource result = response.get("result", {}) if not isinstance(result, dict): return { "success": False, "error": { "code": "INVALID_RESPONSE", "message": "Invalid response format from Ghidra instance" }, "timestamp": int(time.time() * 1000) } instance_info = { "port": port, "url": get_instance_url(port), "program_name": result.get("name", "unknown"), "program_id": result.get("programId", "unknown"), "language": result.get("languageId", "unknown"), "compiler": result.get("compilerSpecId", "unknown"), "base_address": result.get("imageBase", "0x0"), "memory_size": result.get("memorySize", 0), "analysis_complete": result.get("analysisComplete", False) } # Add project information if available if "project" in active_instances[port]: instance_info["project"] = active_instances[port]["project"] return instance_info @mcp.resource(uri="/instance/{port}/function/decompile/address/{address}") def decompiled_function_by_address(port: int = None, address: str = None) -> str: """Get decompiled C code for a function by address Args: port: Specific Ghidra instance port address: Function address in hex format Returns: str: The decompiled C code as a string, or error message """ if not address: return "Error: Address parameter is required" port = _get_instance_port(port) params = { "syntax_tree": "false", "style": "normalize" } endpoint = f"functions/{address}/decompile" response = safe_get(port, endpoint, params) simplified = simplify_response(response) # For a resource, we want to directly return just the decompiled code if (not isinstance(simplified, dict) or not simplified.get("success", False) or "result" not in simplified): error_message = "Error: Could not decompile function" if isinstance(simplified, dict) and "error" in simplified: if isinstance(simplified["error"], dict): error_message = simplified["error"].get("message", error_message) else: error_message = str(simplified["error"]) return error_message # Extract just the decompiled code text result = simplified["result"] # Different endpoints may return the code in different fields, try all of them if isinstance(result, dict): for key in ["decompiled_text", "ccode", "decompiled"]: if key in result: return result[key] return "Error: Could not extract decompiled code from response" @mcp.resource(uri="/instance/{port}/function/decompile/name/{name}") def decompiled_function_by_name(port: int = None, name: str = None) -> str: """Get decompiled C code for a function by name Args: port: Specific Ghidra instance port name: Function name Returns: str: The decompiled C code as a string, or error message """ if not name: return "Error: Name parameter is required" port = _get_instance_port(port) params = { "syntax_tree": "false", "style": "normalize" } endpoint = f"functions/by-name/{quote(name)}/decompile" response = safe_get(port, endpoint, params) simplified = simplify_response(response) # For a resource, we want to directly return just the decompiled code if (not isinstance(simplified, dict) or not simplified.get("success", False) or "result" not in simplified): error_message = "Error: Could not decompile function" if isinstance(simplified, dict) and "error" in simplified: if isinstance(simplified["error"], dict): error_message = simplified["error"].get("message", error_message) else: error_message = str(simplified["error"]) return error_message # Extract just the decompiled code text result = simplified["result"] # Different endpoints may return the code in different fields, try all of them if isinstance(result, dict): for key in ["decompiled_text", "ccode", "decompiled"]: if key in result: return result[key] return "Error: Could not extract decompiled code from response" @mcp.resource(uri="/instance/{port}/function/info/address/{address}") def function_info_by_address(port: int = None, address: str = None) -> dict: """Get detailed information about a function by address Args: port: Specific Ghidra instance port address: Function address in hex format Returns: dict: Complete function information including signature, parameters, etc. """ if not address: return { "success": False, "error": { "code": "MISSING_PARAMETER", "message": "Address parameter is required" }, "timestamp": int(time.time() * 1000) } port = _get_instance_port(port) endpoint = f"functions/{address}" response = safe_get(port, endpoint) simplified = simplify_response(response) if (not isinstance(simplified, dict) or not simplified.get("success", False) or "result" not in simplified): return { "success": False, "error": { "code": "FUNCTION_NOT_FOUND", "message": "Could not get function information", "details": simplified.get("error") if isinstance(simplified, dict) else None }, "timestamp": int(time.time() * 1000) } # Return just the function data without API metadata return simplified["result"] @mcp.resource(uri="/instance/{port}/function/info/name/{name}") def function_info_by_name(port: int = None, name: str = None) -> dict: """Get detailed information about a function by name Args: port: Specific Ghidra instance port name: Function name Returns: dict: Complete function information including signature, parameters, etc. """ if not name: return { "success": False, "error": { "code": "MISSING_PARAMETER", "message": "Name parameter is required" }, "timestamp": int(time.time() * 1000) } port = _get_instance_port(port) endpoint = f"functions/by-name/{quote(name)}" response = safe_get(port, endpoint) simplified = simplify_response(response) if (not isinstance(simplified, dict) or not simplified.get("success", False) or "result" not in simplified): return { "success": False, "error": { "code": "FUNCTION_NOT_FOUND", "message": "Could not get function information", "details": simplified.get("error") if isinstance(simplified, dict) else None }, "timestamp": int(time.time() * 1000) } # Return just the function data without API metadata return simplified["result"] @mcp.resource(uri="/instance/{port}/function/disassembly/address/{address}") def disassembly_by_address(port: int = None, address: str = None) -> str: """Get disassembled instructions for a function by address Args: port: Specific Ghidra instance port address: Function address in hex format Returns: str: Formatted disassembly listing as a string """ if not address: return "Error: Address parameter is required" port = _get_instance_port(port) endpoint = f"functions/{address}/disassembly" response = safe_get(port, endpoint) simplified = simplify_response(response) if (not isinstance(simplified, dict) or not simplified.get("success", False) or "result" not in simplified): error_message = "Error: Could not get disassembly" if isinstance(simplified, dict) and "error" in simplified: if isinstance(simplified["error"], dict): error_message = simplified["error"].get("message", error_message) else: error_message = str(simplified["error"]) return error_message # For a resource, we want to directly return just the disassembly text result = simplified["result"] # Check if we have a disassembly_text field already if isinstance(result, dict) and "disassembly_text" in result: return result["disassembly_text"] # Otherwise if we have raw instructions, format them ourselves if isinstance(result, dict) and "instructions" in result and isinstance(result["instructions"], list): disasm_text = "" for instr in result["instructions"]: if isinstance(instr, dict): addr = instr.get("address", "") mnemonic = instr.get("mnemonic", "") operands = instr.get("operands", "") bytes_str = instr.get("bytes", "") # Format: address: bytes mnemonic operands disasm_text += f"{addr}: {bytes_str.ljust(10)} {mnemonic} {operands}\n" return disasm_text # If we have a direct disassembly field, try that as well if isinstance(result, dict) and "disassembly" in result: return result["disassembly"] return "Error: Could not extract disassembly from response" @mcp.resource(uri="/instance/{port}/function/disassembly/name/{name}") def disassembly_by_name(port: int = None, name: str = None) -> str: """Get disassembled instructions for a function by name Args: port: Specific Ghidra instance port name: Function name Returns: str: Formatted disassembly listing as a string """ if not name: return "Error: Name parameter is required" port = _get_instance_port(port) endpoint = f"functions/by-name/{quote(name)}/disassembly" response = safe_get(port, endpoint) simplified = simplify_response(response) if (not isinstance(simplified, dict) or not simplified.get("success", False) or "result" not in simplified): error_message = "Error: Could not get disassembly" if isinstance(simplified, dict) and "error" in simplified: if isinstance(simplified["error"], dict): error_message = simplified["error"].get("message", error_message) else: error_message = str(simplified["error"]) return error_message # For a resource, we want to directly return just the disassembly text result = simplified["result"] # Check if we have a disassembly_text field already if isinstance(result, dict) and "disassembly_text" in result: return result["disassembly_text"] # Otherwise if we have raw instructions, format them ourselves if isinstance(result, dict) and "instructions" in result and isinstance(result["instructions"], list): disasm_text = "" for instr in result["instructions"]: if isinstance(instr, dict): addr = instr.get("address", "") mnemonic = instr.get("mnemonic", "") operands = instr.get("operands", "") bytes_str = instr.get("bytes", "") # Format: address: bytes mnemonic operands disasm_text += f"{addr}: {bytes_str.ljust(10)} {mnemonic} {operands}\n" return disasm_text # If we have a direct disassembly field, try that as well if isinstance(result, dict) and "disassembly" in result: return result["disassembly"] return "Error: Could not extract disassembly from response" # ================= Enumeration Resources ================= # Lightweight read-only resources for listing/enumerating Ghidra data # More efficient than tool calls for simple data access @mcp.resource(uri="/instances") def resource_instances_list() -> dict: """List all active Ghidra instances Returns a lightweight summary of available instances for quick enumeration. Use the /instance/{port} resource for detailed program info. Returns: dict: List of instances with port, project, and file info """ # Auto-discover instances before listing _discover_instances(QUICK_DISCOVERY_RANGE, host=None, timeout=0.5) with instances_lock: instances = [ { "port": port, "project": info.get("project", ""), "file": info.get("file", ""), "url": info.get("url", f"http://{ghidra_host}:{port}") } for port, info in active_instances.items() ] return { "instances": instances, "count": len(instances), "current_port": current_instance_port, "_hint": "Use /instance/{port} for detailed program info" } @mcp.resource(uri="/instance/{port}/functions") def resource_functions_list(port: int = None) -> dict: """List all functions in the program (lightweight enumeration) Returns function names and addresses for quick reference. This is a read-only resource - use functions_list tool for filtering/pagination. Args: port: Ghidra instance port Returns: dict: List of functions with name, address, and size """ port = _get_instance_port(port) # Fetch functions from Ghidra (limited for resource efficiency) params = {"limit": 1000} # Cap at 1000 for resource response response = safe_get(port, "functions", params) simplified = simplify_response(response) if not simplified.get("success", True): return simplified functions = simplified.get("result", simplified.get("functions", [])) if isinstance(functions, dict): functions = functions.get("functions", []) # Extract just the essential fields func_list = [] for f in functions[:1000]: # Hard cap if isinstance(f, dict): func_list.append({ "name": f.get("name", "unknown"), "address": f.get("entryPoint", f.get("address", "")), "size": f.get("size", 0) }) return { "functions": func_list, "count": len(func_list), "truncated": len(functions) > 1000, "_hint": "Use functions_list tool for filtering and pagination of large lists" } @mcp.resource(uri="/instance/{port}/strings") def resource_strings_list(port: int = None) -> dict: """List defined strings in the program (lightweight enumeration) Returns string values and addresses for quick reference. Use data_list_strings tool for filtering/pagination. Args: port: Ghidra instance port Returns: dict: List of strings with address and value """ port = _get_instance_port(port) params = {"limit": 500} # Strings can be verbose, cap lower response = safe_get(port, "strings", params) simplified = simplify_response(response) if not simplified.get("success", True): return simplified strings = simplified.get("result", simplified.get("strings", [])) if isinstance(strings, dict): strings = strings.get("strings", []) # Extract essential fields string_list = [] for s in strings[:500]: if isinstance(s, dict): string_list.append({ "address": s.get("address", ""), "value": s.get("value", s.get("string", ""))[:200], # Truncate long strings "length": s.get("length", len(s.get("value", ""))) }) return { "strings": string_list, "count": len(string_list), "truncated": len(strings) > 500, "_hint": "Use data_list_strings tool for full strings and pagination" } @mcp.resource(uri="/instance/{port}/data") def resource_data_list(port: int = None) -> dict: """List defined data items in the program (lightweight enumeration) Returns data labels, addresses, and types for quick reference. Use data_list tool for filtering/pagination. Args: port: Ghidra instance port Returns: dict: List of data items with address, name, and type """ port = _get_instance_port(port) params = {"limit": 1000} response = safe_get(port, "data", params) simplified = simplify_response(response) if not simplified.get("success", True): return simplified data_items = simplified.get("result", simplified.get("data", [])) if isinstance(data_items, dict): data_items = data_items.get("data", []) # Extract essential fields data_list = [] for d in data_items[:1000]: if isinstance(d, dict): data_list.append({ "address": d.get("address", ""), "name": d.get("name", d.get("label", "")), "type": d.get("type", d.get("dataType", "")) }) return { "data": data_list, "count": len(data_list), "truncated": len(data_items) > 1000, "_hint": "Use data_list tool for filtering and pagination" } @mcp.resource(uri="/instance/{port}/structs") def resource_structs_list(port: int = None) -> dict: """List defined struct types in the program (lightweight enumeration) Returns struct names, sizes, and categories for quick reference. Use structs_list tool for filtering/pagination, structs_get for fields. Args: port: Ghidra instance port Returns: dict: List of structs with name, size, and category """ port = _get_instance_port(port) params = {"limit": 500} response = safe_get(port, "structs", params) simplified = simplify_response(response) if not simplified.get("success", True): return simplified structs = simplified.get("result", simplified.get("structs", [])) if isinstance(structs, dict): structs = structs.get("structs", []) # Extract essential fields struct_list = [] for s in structs[:500]: if isinstance(s, dict): struct_list.append({ "name": s.get("name", ""), "size": s.get("size", s.get("length", 0)), "category": s.get("category", s.get("categoryPath", "")) }) return { "structs": struct_list, "count": len(struct_list), "truncated": len(structs) > 500, "_hint": "Use structs_list tool for pagination, structs_get for field details" } @mcp.resource(uri="/instance/{port}/xrefs/to/{address}") def resource_xrefs_to(port: int = None, address: str = None) -> dict: """List cross-references TO an address (lightweight enumeration) Returns references pointing to the specified address. Use xrefs_list tool for full filtering/pagination. Args: port: Ghidra instance port address: Target address in hex format Returns: dict: List of references to this address """ if not address: return {"error": "Address parameter required"} port = _get_instance_port(port) params = {"toAddress": address, "limit": 200} response = safe_get(port, "xrefs", params) simplified = simplify_response(response) if not simplified.get("success", True): return simplified xrefs = simplified.get("result", simplified.get("xrefs", [])) if isinstance(xrefs, dict): xrefs = xrefs.get("xrefs", []) # Extract essential fields xref_list = [] for x in xrefs[:200]: if isinstance(x, dict): xref_list.append({ "from": x.get("fromAddress", x.get("from", "")), "type": x.get("refType", x.get("type", "")), "context": x.get("context", "")[:100] if x.get("context") else "" }) return { "to_address": address, "references": xref_list, "count": len(xref_list), "truncated": len(xrefs) > 200, "_hint": "Use xrefs_list tool for full filtering and pagination" } @mcp.resource(uri="/instance/{port}/xrefs/from/{address}") def resource_xrefs_from(port: int = None, address: str = None) -> dict: """List cross-references FROM an address (lightweight enumeration) Returns references originating from the specified address. Use xrefs_list tool for full filtering/pagination. Args: port: Ghidra instance port address: Source address in hex format Returns: dict: List of references from this address """ if not address: return {"error": "Address parameter required"} port = _get_instance_port(port) params = {"fromAddress": address, "limit": 200} response = safe_get(port, "xrefs", params) simplified = simplify_response(response) if not simplified.get("success", True): return simplified xrefs = simplified.get("result", simplified.get("xrefs", [])) if isinstance(xrefs, dict): xrefs = xrefs.get("xrefs", []) # Extract essential fields xref_list = [] for x in xrefs[:200]: if isinstance(x, dict): xref_list.append({ "to": x.get("toAddress", x.get("to", "")), "type": x.get("refType", x.get("type", "")), "context": x.get("context", "")[:100] if x.get("context") else "" }) return { "from_address": address, "references": xref_list, "count": len(xref_list), "truncated": len(xrefs) > 200, "_hint": "Use xrefs_list tool for full filtering and pagination" } @mcp.resource(uri="/instance/{port}/summary") def resource_program_summary(port: int = None) -> dict: """Get a comprehensive summary of the loaded program Combines instance info with counts of functions, strings, data, etc. Useful for getting a quick overview before detailed analysis. Args: port: Ghidra instance port Returns: dict: Program summary with statistics """ port = _get_instance_port(port) # Get basic program info program_info = ghidra_instance(port=port) if "error" in program_info: return program_info # Get counts (lightweight queries) summary = { "program": program_info, "statistics": {} } # Function count try: fn_response = safe_get(port, "functions", {"limit": 1}) if isinstance(fn_response, dict): total = fn_response.get("result", {}).get("total", 0) if not total: total = fn_response.get("total", 0) summary["statistics"]["functions"] = total except Exception: summary["statistics"]["functions"] = "unknown" # String count try: str_response = safe_get(port, "strings", {"limit": 1}) if isinstance(str_response, dict): total = str_response.get("result", {}).get("total", 0) if not total: total = str_response.get("total", 0) summary["statistics"]["strings"] = total except Exception: summary["statistics"]["strings"] = "unknown" # Data count try: data_response = safe_get(port, "data", {"limit": 1}) if isinstance(data_response, dict): total = data_response.get("result", {}).get("total", 0) if not total: total = data_response.get("total", 0) summary["statistics"]["data_items"] = total except Exception: summary["statistics"]["data_items"] = "unknown" summary["_hint"] = "Use /instance/{port}/functions, /strings, /data for listings" return summary # ================= MCP Prompts ================= # Prompts define reusable templates for LLM interactions @mcp.prompt("analyze_function") def analyze_function_prompt(name: str = None, address: str = None, port: int = None): """A prompt to guide the LLM through analyzing a function Args: name: Function name (mutually exclusive with address) address: Function address in hex format (mutually exclusive with address) port: Specific Ghidra instance port (optional) """ port = _get_instance_port(port) # Get function name if only address is provided if address and not name: fn_info = function_info_by_address(address=address, port=port) if isinstance(fn_info, dict) and "name" in fn_info: name = fn_info["name"] # Create the template that guides analysis decompiled = "" disasm = "" fn_info = None if address: decompiled = decompiled_function_by_address(address=address, port=port) disasm = disassembly_by_address(address=address, port=port) fn_info = function_info_by_address(address=address, port=port) elif name: decompiled = decompiled_function_by_name(name=name, port=port) disasm = disassembly_by_name(name=name, port=port) fn_info = function_info_by_name(name=name, port=port) return { "prompt": f""" Analyze the following function: {name or address} Decompiled code: ```c {decompiled} ``` Disassembly: ``` {disasm} ``` 1. What is the purpose of this function? 2. What are the key parameters and their uses? 3. What are the return values and their meanings? 4. Are there any security concerns in this implementation? 5. Describe the algorithm or process being implemented. """, "context": { "function_info": fn_info } } @mcp.prompt("identify_vulnerabilities") def identify_vulnerabilities_prompt(name: str = None, address: str = None, port: int = None): """A prompt to help identify potential vulnerabilities in a function Args: name: Function name (mutually exclusive with address) address: Function address in hex format (mutually exclusive with address) port: Specific Ghidra instance port (optional) """ port = _get_instance_port(port) # Get function name if only address is provided if address and not name: fn_info = function_info_by_address(address=address, port=port) if isinstance(fn_info, dict) and "name" in fn_info: name = fn_info["name"] # Create the template focused on security analysis decompiled = "" disasm = "" fn_info = None if address: decompiled = decompiled_function_by_address(address=address, port=port) disasm = disassembly_by_address(address=address, port=port) fn_info = function_info_by_address(address=address, port=port) elif name: decompiled = decompiled_function_by_name(name=name, port=port) disasm = disassembly_by_name(name=name, port=port) fn_info = function_info_by_name(name=name, port=port) return { "prompt": f""" Analyze the following function for security vulnerabilities: {name or address} Decompiled code: ```c {decompiled} ``` Look for these vulnerability types: 1. Buffer overflows or underflows 2. Integer overflow/underflow 3. Use-after-free or double-free bugs 4. Format string vulnerabilities 5. Missing bounds checks 6. Insecure memory operations 7. Race conditions or timing issues 8. Input validation problems For each potential vulnerability: - Describe the vulnerability and where it occurs - Explain the security impact - Suggest how it could be exploited - Recommend a fix """, "context": { "function_info": fn_info, "disassembly": disasm } } @mcp.prompt("reverse_engineer_binary") def reverse_engineer_binary_prompt(port: int = None): """A comprehensive prompt to guide the process of reverse engineering an entire binary Args: port: Specific Ghidra instance port (optional) """ port = _get_instance_port(port) # Get program info for context program_info = ghidra_instance(port=port) # Create a comprehensive reverse engineering guide return { "prompt": f""" # Comprehensive Binary Reverse Engineering Plan Begin reverse engineering the binary {program_info.get('program_name', 'unknown')} using a methodical approach. ## Phase 1: Initial Reconnaissance 1. Analyze entry points and the main function 2. Identify and catalog key functions and libraries 3. Map the overall program structure 4. Identify important data structures ## Phase 2: Functional Analysis 1. Start with main() or entry point functions and trace the control flow 2. Find and rename all unnamed functions (FUN_*) called from main 3. For each function: - Decompile and analyze its purpose - Rename with descriptive names following consistent patterns - Add comments for complex logic - Identify parameters and return values 4. Follow cross-references (xrefs) to understand context of function usage 5. Pay special attention to: - File I/O operations - Network communication - Memory allocation/deallocation - Authentication/encryption routines - Data processing algorithms ## Phase 3: Data Flow Mapping 1. Identify key data structures and rename them meaningfully 2. Track global variables and their usage across functions 3. Map data transformations through the program 4. Identify sensitive data handling (keys, credentials, etc.) ## Phase 4: Deep Analysis 1. For complex functions, perform deeper analysis using: - Data flow analysis - Call graph analysis - Security vulnerability scanning 2. Look for interesting patterns: - Command processing routines - State machines - Protocol implementations - Cryptographic operations ## Implementation Strategy 1. Start with functions called from main 2. Search for unnamed functions with pattern "FUN_*" 3. Decompile each function and analyze its purpose 4. Look at its call graph and cross-references to understand context 5. Rename the function based on its behavior 6. Document key insights 7. Continue iteratively until the entire program flow is mapped ## Function Prioritization 1. Start with entry points and initialization functions 2. Focus on functions with high centrality in the call graph 3. Pay special attention to functions with: - Command processing logic - Error handling - Security checks - Data transformation Remember to use the available GhydraMCP tools: - Use functions_list to find functions matching patterns - Use xrefs_list to find cross-references - Use functions_decompile for C-like representations - Use functions_disassemble for lower-level analysis - Use functions_rename to apply meaningful names - Use data_* tools to work with program data """, "context": { "program_info": program_info } } # ================= MCP Tools ================= # Since we can't use tool groups, we'll use namespaces in the function names # Instance management tools @mcp.tool() def instances_list() -> dict: """List all active Ghidra instances This is the primary tool for working with instances. It automatically discovers new instances on the default host before listing. Use instances_discover(host) only if you need to scan a different host. Returns: dict: Contains 'instances' list with all available Ghidra instances """ # Auto-discover new instances before listing _discover_instances(QUICK_DISCOVERY_RANGE, host=None, timeout=0.5) with instances_lock: return { "instances": [ { "port": port, "url": info["url"], "project": info.get("project", ""), "file": info.get("file", "") } for port, info in active_instances.items() ] } @mcp.tool() def instances_discover(host: str = None) -> dict: """Discover Ghidra instances on a specific host Use this ONLY when you need to discover instances on a different host. For normal usage, just use instances_list() which auto-discovers on the default host. Args: host: Host to scan for Ghidra instances (default: configured ghidra_host) Returns: dict: Contains 'instances' list with all available instances after discovery """ # Discover instances on the specified host _discover_instances(QUICK_DISCOVERY_RANGE, host=host, timeout=0.5) # Return all instances (same format as instances_list for consistency) with instances_lock: return { "instances": [ { "port": port, "url": info["url"], "project": info.get("project", ""), "file": info.get("file", "") } for port, info in active_instances.items() ] } @mcp.tool() def instances_register(port: int, url: str = None) -> str: """Register a new Ghidra instance Args: port: Port number of the Ghidra instance url: Optional URL if different from default http://host:port Returns: str: Confirmation message or error """ return register_instance(port, url) @mcp.tool() def instances_unregister(port: int) -> str: """Unregister a Ghidra instance Args: port: Port number of the instance to unregister Returns: str: Confirmation message or error """ with instances_lock: if port in active_instances: del active_instances[port] return f"Unregistered instance on port {port}" return f"No instance found on port {port}" @mcp.tool() def instances_use(port: int) -> str: """Set the current working Ghidra instance Args: port: Port number of the instance to use Returns: str: Confirmation message or error """ global current_instance_port # First validate that the instance exists and is active if port not in active_instances: # Try to register it if not found register_instance(port) if port not in active_instances: return f"Error: No active Ghidra instance found on port {port}" # Set as current instance current_instance_port = port # Return information about the selected instance with instances_lock: info = active_instances[port] program = info.get("file", "unknown program") project = info.get("project", "unknown project") return f"Now using Ghidra instance on port {port} with {program} in project {project}" @mcp.tool() def instances_current() -> dict: """Get information about the current working Ghidra instance Returns: dict: Details about the current instance and program """ return ghidra_instance(port=current_instance_port) # ================= Cursor Management Tools ================= # Tools for managing pagination cursors with session isolation def _get_session_id(ctx: Context = None) -> str: """Get session ID from FastMCP context Uses the session object's id() for reliable session tracking. The session object persists across tool calls within the same MCP connection. Security: This function does NOT accept manual session_id overrides to prevent session spoofing attacks. """ if ctx: # Try to get client_id first (explicitly provided by client) if hasattr(ctx, 'client_id') and ctx.client_id: return f"client-{ctx.client_id}" # Use session object's memory id as unique session identifier # This persists across tool calls within the same MCP connection if hasattr(ctx, 'session') and ctx.session: return f"session-{id(ctx.session)}" # Fallback to request_id prefix for stdio transport if hasattr(ctx, 'request_id') and ctx.request_id: return f"req-{ctx.request_id[:8]}" if len(ctx.request_id) > 8 else f"req-{ctx.request_id}" return "default" @mcp.tool() def cursor_next(cursor_id: str, ctx: Context = None) -> dict: """Get the next page of results for a pagination cursor Args: cursor_id: The cursor ID from a previous paginated response ctx: FastMCP context (auto-injected) Returns: dict: Next page of results with updated pagination info """ if not cursor_id: return { "success": False, "error": { "code": "MISSING_PARAMETER", "message": "cursor_id parameter is required" }, "timestamp": int(time.time() * 1000) } sid = _get_session_id(ctx) state = cursor_manager.advance_cursor(cursor_id, sid) if not state: return { "success": False, "error": { "code": "CURSOR_NOT_FOUND", "message": f"Cursor '{cursor_id}' not found, expired, or belongs to another session" }, "timestamp": int(time.time() * 1000) } current_page = cursor_manager.get_page(state) response_cursor = cursor_id if state.has_more else None response = { "success": True, "result": current_page, "pagination": { "cursor_id": response_cursor, "session_id": state.session_id, "tool_name": state.tool_name, "total_count": state.total_count, "filtered_count": state.filtered_count, "page_size": state.page_size, "current_page": state.current_page, "total_pages": state.total_pages, "has_more": state.has_more, "grep_pattern": state.grep_pattern, "items_returned": len(current_page), "ttl_remaining": state.ttl_remaining, }, "timestamp": int(time.time() * 1000) } # Add prominent message for LLMs if state.has_more: remaining = state.filtered_count - (state.current_page * state.page_size) response["_message"] = ( f"📄 Page {state.current_page}/{state.total_pages}: " f"{len(current_page)} items. {remaining} more available. " f"Continue with: cursor_next(cursor_id='{cursor_id}')" ) else: total_fetched = state.current_page * state.page_size response["_message"] = ( f"✅ Final page {state.current_page}/{state.total_pages}: " f"{len(current_page)} items. All {state.filtered_count} items retrieved." ) return response @mcp.tool() def cursor_list(ctx: Context = None, all_sessions: bool = False) -> dict: """List active pagination cursors Args: ctx: FastMCP context (auto-injected) all_sessions: If True, list cursors from all sessions (admin use) Returns: dict: List of active cursors with their metadata """ sid = None if all_sessions else _get_session_id(ctx) cursors = cursor_manager.list_cursors(session_id=sid) return { "success": True, "result": cursors, "stats": cursor_manager.get_stats(), "timestamp": int(time.time() * 1000) } @mcp.tool() def cursor_delete(cursor_id: str, ctx: Context = None) -> dict: """Delete a pagination cursor to free resources Args: cursor_id: The cursor ID to delete ctx: FastMCP context (auto-injected) Returns: dict: Operation result """ if not cursor_id: return { "success": False, "error": { "code": "MISSING_PARAMETER", "message": "cursor_id parameter is required" }, "timestamp": int(time.time() * 1000) } sid = _get_session_id(ctx) deleted = cursor_manager.delete_cursor(cursor_id, sid) if deleted: return { "success": True, "result": { "deleted": True, "cursor_id": cursor_id, "message": "Cursor deleted successfully" }, "timestamp": int(time.time() * 1000) } else: return { "success": False, "error": { "code": "CURSOR_NOT_FOUND", "message": f"Cursor '{cursor_id}' not found or belongs to another session" }, "timestamp": int(time.time() * 1000) } @mcp.tool() def cursor_delete_all(ctx: Context = None) -> dict: """Delete all pagination cursors for the current session Args: ctx: FastMCP context (auto-injected) Returns: dict: Number of cursors deleted """ sid = _get_session_id(ctx) count = cursor_manager.delete_session_cursors(sid) return { "success": True, "result": { "deleted_count": count, "session_id": sid, "message": f"Deleted {count} cursor(s) for session '{sid}'" }, "timestamp": int(time.time() * 1000) } # ================= End Cursor Management Tools ================= # Function tools @mcp.tool() def functions_list( name_contains: str = None, name_matches_regex: str = None, port: int = None, # Pagination parameters page_size: int = DEFAULT_PAGE_SIZE, grep: str = None, grep_ignorecase: bool = True, return_all: bool = False, ctx: Context = None ) -> dict: """List functions with cursor-based pagination and grep filtering Args: name_contains: Substring name filter (case-insensitive, server-side) name_matches_regex: Regex name filter (server-side) port: Specific Ghidra instance port (optional) page_size: Items per page (default: 50, max: 500) grep: Regex pattern to filter results client-side (e.g., "main|init", "FUN_.*") grep_ignorecase: Case-insensitive grep (default: True) return_all: Bypass pagination and return all results (use with caution) ctx: FastMCP context (auto-injected) Returns: dict: List of functions with pagination info. Use cursor_next(cursor_id) for more. Examples: # Get first page of all functions functions_list() # Filter to functions containing "main" functions_list(name_contains="main") # Client-side grep for FUN_* named functions functions_list(grep="^FUN_") # Get all functions (bypasses pagination - use carefully!) functions_list(return_all=True) """ port_to_use = _get_instance_port(port) sid = _get_session_id(ctx) # Fetch a larger batch from Ghidra to enable client-side pagination # We request more than page_size to allow grep filtering fetch_limit = 5000 if return_all else max(page_size * 10, 500) params = { "offset": 0, "limit": fetch_limit } if name_contains: params["name_contains"] = name_contains if name_matches_regex: params["name_matches_regex"] = name_matches_regex response = safe_get(port_to_use, "functions", params) simplified = simplify_response(response) # Handle error responses if not isinstance(simplified, dict) or not simplified.get("success", False): return simplified # Extract the result list result_data = simplified.get("result", []) if not isinstance(result_data, list): return simplified # Build query params for cursor hashing query_params = { "tool": "functions_list", "port": port_to_use, "name_contains": name_contains, "name_matches_regex": name_matches_regex, "grep": grep } # Use the paginate_response helper return paginate_response( data=result_data, query_params=query_params, tool_name="functions_list", session_id=sid, page_size=page_size, grep=grep, grep_ignorecase=grep_ignorecase, return_all=return_all ) @mcp.tool() def functions_get(name: str = None, address: str = None, port: int = None) -> dict: """Get detailed information about a function Args: name: Function name (mutually exclusive with address) address: Function address in hex format (mutually exclusive with name) port: Specific Ghidra instance port (optional) Returns: dict: Detailed function information """ if not name and not address: return { "success": False, "error": { "code": "MISSING_PARAMETER", "message": "Either name or address parameter is required" }, "timestamp": int(time.time() * 1000) } port = _get_instance_port(port) if address: endpoint = f"functions/{address}" else: endpoint = f"functions/by-name/{quote(name)}" response = safe_get(port, endpoint) return simplify_response(response) @mcp.tool() def functions_decompile( name: str = None, address: str = None, syntax_tree: bool = False, style: str = "normalize", port: int = None, # Pagination parameters (line-based) page_size: int = 50, grep: str = None, grep_ignorecase: bool = True, return_all: bool = False, ctx: Context = None ) -> dict: """Get decompiled code for a function with cursor-based line pagination Args: name: Function name (mutually exclusive with address) address: Function address in hex format (mutually exclusive with name) syntax_tree: Include syntax tree (default: False) style: Decompiler style (default: "normalize") port: Specific Ghidra instance port (optional) page_size: Lines per page (default: 50, max: 500) grep: Regex pattern to filter lines (e.g., "if.*==", "malloc|free") grep_ignorecase: Case-insensitive grep (default: True) return_all: Return all lines without pagination (use with caution for large functions) ctx: FastMCP context (auto-injected) Returns: dict: Decompiled code with pagination. Use cursor_next(cursor_id) for more lines. Examples: # Get first 50 lines (default) functions_decompile(name="main") # Search for specific patterns functions_decompile(name="main", grep="if.*NULL") # Get all lines (for small functions) functions_decompile(name="small_func", return_all=True) """ if not name and not address: return { "success": False, "error": { "code": "MISSING_PARAMETER", "message": "Either name or address parameter is required" }, "timestamp": int(time.time() * 1000) } port_to_use = _get_instance_port(port) params = { "syntax_tree": str(syntax_tree).lower(), "style": style } if address: endpoint = f"functions/{address}/decompile" func_id = address else: endpoint = f"functions/by-name/{quote(name)}/decompile" func_id = name response = safe_get(port_to_use, endpoint, params) simplified = simplify_response(response) if not simplified.get("success", False): return simplified # Extract the decompiled code and split into lines result = simplified.get("result", {}) code = result.get("code", "") if isinstance(result, dict) else "" if not code: return simplified # Return as-is if no code # Split code into lines, preserving line numbers lines = code.split('\n') # Create line objects with line numbers for better grep matching line_objects = [{"line_num": i + 1, "code": line} for i, line in enumerate(lines)] # Build query params for cursor hashing query_params = { "tool": "functions_decompile", "port": port_to_use, "name": name, "address": address, "style": style, "grep": grep } sid = _get_session_id(ctx) # Use pagination system paginated = paginate_response( data=line_objects, query_params=query_params, tool_name="functions_decompile", session_id=sid, page_size=min(page_size, MAX_PAGE_SIZE), grep=grep, grep_ignorecase=grep_ignorecase, return_all=return_all ) # Transform result back to code format with line numbers if paginated.get("success"): page_lines = paginated.get("result", []) # Format as "line_num: code" for clarity formatted_lines = [f"{item['line_num']:4d}: {item['code']}" for item in page_lines] paginated["result"] = { "function": func_id, "code_lines": formatted_lines, "raw_lines": [item['code'] for item in page_lines] } # Add function metadata if available if isinstance(result, dict): for key in ["name", "address", "signature", "return_type"]: if key in result: paginated["result"][key] = result[key] return paginated @mcp.tool() def functions_disassemble( name: str = None, address: str = None, port: int = None, # Pagination parameters (instruction-based) page_size: int = 50, grep: str = None, grep_ignorecase: bool = True, return_all: bool = False, ctx: Context = None ) -> dict: """Get disassembly for a function with cursor-based instruction pagination Args: name: Function name (mutually exclusive with address) address: Function address in hex format (mutually exclusive with name) port: Specific Ghidra instance port (optional) page_size: Instructions per page (default: 50, max: 500) grep: Regex pattern to filter instructions (e.g., "CALL", "JMP|JNZ", "MOV.*EAX") grep_ignorecase: Case-insensitive grep (default: True) return_all: Return all instructions without pagination ctx: FastMCP context (auto-injected) Returns: dict: Disassembly with pagination. Use cursor_next(cursor_id) for more instructions. Examples: # Get first 50 instructions functions_disassemble(name="main") # Find all CALL instructions functions_disassemble(name="main", grep="CALL") # Find jumps and conditional jumps functions_disassemble(name="main", grep="^J") """ if not name and not address: return { "success": False, "error": { "code": "MISSING_PARAMETER", "message": "Either name or address parameter is required" }, "timestamp": int(time.time() * 1000) } port_to_use = _get_instance_port(port) if address: endpoint = f"functions/{address}/disassembly" func_id = address else: endpoint = f"functions/by-name/{quote(name)}/disassembly" func_id = name response = safe_get(port_to_use, endpoint) simplified = simplify_response(response) if not simplified.get("success", False): return simplified # Extract the disassembly - could be text or structured result = simplified.get("result", {}) # Handle different response formats if isinstance(result, dict): disasm_text = result.get("disassembly", "") or result.get("text", "") instructions = result.get("instructions", []) elif isinstance(result, str): disasm_text = result instructions = [] else: disasm_text = "" instructions = [] # If we have structured instructions, use them; otherwise parse text if instructions: # Already have instruction objects line_objects = instructions elif disasm_text: # Split text into lines lines = disasm_text.strip().split('\n') line_objects = [{"addr": f"line_{i+1}", "instruction": line} for i, line in enumerate(lines) if line.strip()] else: return simplified # Return as-is if no disassembly # Build query params for cursor hashing query_params = { "tool": "functions_disassemble", "port": port_to_use, "name": name, "address": address, "grep": grep } sid = _get_session_id(ctx) # Use pagination system paginated = paginate_response( data=line_objects, query_params=query_params, tool_name="functions_disassemble", session_id=sid, page_size=min(page_size, MAX_PAGE_SIZE), grep=grep, grep_ignorecase=grep_ignorecase, return_all=return_all ) # Add function context to result if paginated.get("success"): page_instructions = paginated.get("result", []) paginated["result"] = { "function": func_id, "instructions": page_instructions } # Add function metadata if available if isinstance(result, dict): for key in ["name", "address", "entry_point", "size"]: if key in result: paginated["result"][key] = result[key] return paginated @mcp.tool() def functions_create(address: str, port: int = None) -> dict: """Create a new function at the specified address Args: address: Memory address in hex format where function starts port: Specific Ghidra instance port (optional) Returns: dict: Operation result with the created function information """ if not address: return { "success": False, "error": { "code": "MISSING_PARAMETER", "message": "Address parameter is required" }, "timestamp": int(time.time() * 1000) } port = _get_instance_port(port) payload = { "address": address } response = safe_post(port, "functions", payload) return simplify_response(response) @mcp.tool() def functions_rename(old_name: str = None, address: str = None, new_name: str = "", port: int = None) -> dict: """Rename a function Args: old_name: Current function name (mutually exclusive with address) address: Function address in hex format (mutually exclusive with name) new_name: New function name port: Specific Ghidra instance port (optional) Returns: dict: Operation result with the updated function information """ if not (old_name or address) or not new_name: return { "success": False, "error": { "code": "MISSING_PARAMETER", "message": "Either old_name or address, and new_name parameters are required" }, "timestamp": int(time.time() * 1000) } port = _get_instance_port(port) payload = { "name": new_name } if address: endpoint = f"functions/{address}" else: endpoint = f"functions/by-name/{quote(old_name)}" response = safe_patch(port, endpoint, payload) return simplify_response(response) @mcp.tool() def functions_set_signature(name: str = None, address: str = None, signature: str = "", port: int = None) -> dict: """Set function signature/prototype Args: name: Function name (mutually exclusive with address) address: Function address in hex format (mutually exclusive with name) signature: New function signature (e.g., "int func(char *data, int size)") port: Specific Ghidra instance port (optional) Returns: dict: Operation result with the updated function information """ if not (name or address) or not signature: return { "success": False, "error": { "code": "MISSING_PARAMETER", "message": "Either name or address, and signature parameters are required" }, "timestamp": int(time.time() * 1000) } port = _get_instance_port(port) payload = { "signature": signature } if address: endpoint = f"functions/{address}" else: endpoint = f"functions/by-name/{quote(name)}" response = safe_patch(port, endpoint, payload) return simplify_response(response) @mcp.tool() def functions_get_variables( name: str = None, address: str = None, port: int = None, # Pagination parameters page_size: int = DEFAULT_PAGE_SIZE, grep: str = None, grep_ignorecase: bool = True, return_all: bool = False, ctx: Context = None ) -> dict: """Get variables for a function with cursor-based pagination Args: name: Function name (mutually exclusive with address) address: Function address in hex format (mutually exclusive with name) port: Specific Ghidra instance port (optional) page_size: Variables per page (default: 50, max: 500) grep: Regex pattern to filter variables (e.g., "local_", "param", "ptr.*int") grep_ignorecase: Case-insensitive grep (default: True) return_all: Return all variables without pagination ctx: FastMCP context (auto-injected) Returns: dict: Variables with pagination. Use cursor_next(cursor_id) for more. Examples: # Get all local variables functions_get_variables(name="main", grep="local_") # Find pointer variables functions_get_variables(name="main", grep="ptr|\\*") """ if not name and not address: return { "success": False, "error": { "code": "MISSING_PARAMETER", "message": "Either name or address parameter is required" }, "timestamp": int(time.time() * 1000) } port_to_use = _get_instance_port(port) if address: endpoint = f"functions/{address}/variables" func_id = address else: endpoint = f"functions/by-name/{quote(name)}/variables" func_id = name response = safe_get(port_to_use, endpoint) simplified = simplify_response(response) if not simplified.get("success", False): return simplified # Extract variables list result = simplified.get("result", {}) variables = result.get("variables", []) if isinstance(result, dict) else [] if not variables: return simplified # Return as-is if no variables # Build query params for cursor hashing query_params = { "tool": "functions_get_variables", "port": port_to_use, "name": name, "address": address, "grep": grep } sid = _get_session_id(ctx) # Use pagination system paginated = paginate_response( data=variables, query_params=query_params, tool_name="functions_get_variables", session_id=sid, page_size=min(page_size, MAX_PAGE_SIZE), grep=grep, grep_ignorecase=grep_ignorecase, return_all=return_all ) # Add function context if paginated.get("success"): paginated["result"] = { "function": func_id, "variables": paginated.get("result", []) } # Preserve other metadata if isinstance(result, dict): for key in ["name", "address", "parameter_count", "local_count"]: if key in result: paginated["result"][key] = result[key] return paginated # Memory tools @mcp.tool() def memory_read(address: str, length: int = 16, format: str = "hex", port: int = None) -> dict: """Read bytes from memory Args: address: Memory address in hex format length: Number of bytes to read (default: 16) format: Output format - "hex", "base64", or "string" (default: "hex") port: Specific Ghidra instance port (optional) Returns: dict: { "address": original address, "length": bytes read, "format": output format, "hexBytes": the memory contents as hex string, "rawBytes": the memory contents as base64 string, "timestamp": response timestamp } """ if not address: return { "success": False, "error": { "code": "MISSING_PARAMETER", "message": "Address parameter is required" }, "timestamp": int(time.time() * 1000) } port = _get_instance_port(port) # Use query parameters instead of path parameters for more reliable handling params = { "address": address, "length": length, "format": format } response = safe_get(port, "memory", params) simplified = simplify_response(response) # Ensure the result is simple and directly usable if "result" in simplified and isinstance(simplified["result"], dict): result = simplified["result"] # Pass through all representations of the bytes memory_info = { "success": True, "address": result.get("address", address), "length": result.get("bytesRead", length), "format": format, "timestamp": simplified.get("timestamp", int(time.time() * 1000)) } # Include all the different byte representations if "hexBytes" in result: memory_info["hexBytes"] = result["hexBytes"] if "rawBytes" in result: memory_info["rawBytes"] = result["rawBytes"] return memory_info return simplified @mcp.tool() def memory_write(address: str, bytes_data: str, format: str = "hex", port: int = None) -> dict: """Write bytes to memory (use with caution) Args: address: Memory address in hex format bytes_data: Data to write (format depends on 'format' parameter) format: Input format - "hex", "base64", or "string" (default: "hex") port: Specific Ghidra instance port (optional) Returns: dict: Operation result with success status """ if not address: return { "success": False, "error": { "code": "MISSING_PARAMETER", "message": "Address parameter is required" }, "timestamp": int(time.time() * 1000) } if not bytes_data: return { "success": False, "error": { "code": "MISSING_PARAMETER", "message": "Bytes parameter is required" }, "timestamp": int(time.time() * 1000) } port = _get_instance_port(port) payload = { "bytes": bytes_data, "format": format } # Memory write is handled by ProgramEndpoints, not MemoryEndpoints response = safe_patch(port, f"programs/current/memory/{address}", payload) return simplify_response(response) # Xrefs tools @mcp.tool() def xrefs_list( to_addr: str = None, from_addr: str = None, type: str = None, port: int = None, # Pagination parameters page_size: int = DEFAULT_PAGE_SIZE, grep: str = None, grep_ignorecase: bool = True, return_all: bool = False, ctx: Context = None ) -> dict: """List cross-references with filtering and cursor-based pagination Args: to_addr: Filter references to this address (hexadecimal) from_addr: Filter references from this address (hexadecimal) type: Filter by reference type (e.g. "CALL", "READ", "WRITE") port: Specific Ghidra instance port (optional) page_size: Items per page (default: 50, max: 500) grep: Regex pattern to filter results grep_ignorecase: Case-insensitive grep (default: True) return_all: Return all results without pagination (use with caution) ctx: FastMCP context (auto-injected) Returns: dict: Cross-references with pagination metadata and cursor for more results """ # At least one of the address parameters must be provided if not to_addr and not from_addr: return { "success": False, "error": { "code": "MISSING_PARAMETER", "message": "Either to_addr or from_addr parameter is required" }, "timestamp": int(time.time() * 1000) } port_to_use = _get_instance_port(port) # Fetch large batch for client-side pagination params = { "offset": 0, "limit": 10000 # Fetch up to 10K for cursor pagination } if to_addr: params["to_addr"] = to_addr if from_addr: params["from_addr"] = from_addr if type: params["type"] = type response = safe_get(port_to_use, "xrefs", params) simplified = simplify_response(response) if not simplified.get("success", False): return simplified all_xrefs = simplified.get("result", []) # Build query params for cursor hashing query_params = { "tool": "xrefs_list", "port": port_to_use, "to_addr": to_addr, "from_addr": from_addr, "type": type, "grep": grep } sid = _get_session_id(ctx) return paginate_response( data=all_xrefs, query_params=query_params, tool_name="xrefs_list", session_id=sid, page_size=page_size, grep=grep, grep_ignorecase=grep_ignorecase, return_all=return_all ) # Data tools @mcp.tool() def data_list( addr: str = None, name: str = None, name_contains: str = None, type: str = None, port: int = None, # Pagination parameters page_size: int = DEFAULT_PAGE_SIZE, grep: str = None, grep_ignorecase: bool = True, return_all: bool = False, ctx: Context = None ) -> dict: """List defined data items with filtering and cursor-based pagination Args: addr: Filter by address (hexadecimal) name: Exact name match filter (case-sensitive) name_contains: Substring name filter (case-insensitive) type: Filter by data type (e.g. "string", "dword") port: Specific Ghidra instance port (optional) page_size: Items per page (default: 50, max: 500) grep: Regex pattern to filter results grep_ignorecase: Case-insensitive grep (default: True) return_all: Return all results without pagination (use with caution) ctx: FastMCP context (auto-injected) Returns: dict: Data items with pagination metadata and cursor for more results """ port_to_use = _get_instance_port(port) # Fetch large batch for client-side pagination params = { "offset": 0, "limit": 10000 # Fetch up to 10K for cursor pagination } if addr: params["addr"] = addr if name: params["name"] = name if name_contains: params["name_contains"] = name_contains if type: params["type"] = type response = safe_get(port_to_use, "data", params) simplified = simplify_response(response) if not simplified.get("success", False): return simplified all_data = simplified.get("result", []) # Build query params for cursor hashing query_params = { "tool": "data_list", "port": port_to_use, "addr": addr, "name": name, "name_contains": name_contains, "type": type, "grep": grep } sid = _get_session_id(ctx) return paginate_response( data=all_data, query_params=query_params, tool_name="data_list", session_id=sid, page_size=page_size, grep=grep, grep_ignorecase=grep_ignorecase, return_all=return_all ) @mcp.tool() def data_create(address: str, data_type: str, size: int = None, port: int = None) -> dict: """Define a new data item at the specified address Args: address: Memory address in hex format data_type: Data type (e.g. "string", "dword", "byte") size: Optional size in bytes for the data item port: Specific Ghidra instance port (optional) Returns: dict: Operation result with the created data information """ if not address or not data_type: return { "success": False, "error": { "code": "MISSING_PARAMETER", "message": "Address and data_type parameters are required" }, "timestamp": int(time.time() * 1000) } port = _get_instance_port(port) payload = { "address": address, "type": data_type } if size is not None: payload["size"] = size response = safe_post(port, "data", payload) return simplify_response(response) @mcp.tool() def data_list_strings( filter: str = None, port: int = None, # Pagination parameters page_size: int = DEFAULT_PAGE_SIZE, grep: str = None, grep_ignorecase: bool = True, return_all: bool = False, ctx: Context = None ) -> dict: """List all defined strings in the binary with cursor-based pagination and grep filtering Args: filter: Server-side string content filter port: Specific Ghidra instance port (optional) page_size: Items per page (default: 50, max: 500) grep: Regex pattern to filter results client-side (e.g., "password|key", "http://") grep_ignorecase: Case-insensitive grep (default: True) return_all: Bypass pagination and return all strings (use with caution) ctx: FastMCP context (auto-injected) Returns: dict: List of string data with pagination info. Use cursor_next(cursor_id) for more. Examples: # Get first page of strings data_list_strings() # Filter to strings containing "error" data_list_strings(filter="error") # Client-side grep for URLs data_list_strings(grep="https?://") # Get all strings (bypasses pagination) data_list_strings(return_all=True) """ port_to_use = _get_instance_port(port) sid = _get_session_id(ctx) # Fetch larger batch for client-side pagination fetch_limit = 10000 if return_all else max(page_size * 10, 2000) params = { "offset": 0, "limit": fetch_limit } if filter: params["filter"] = filter response = safe_get(port_to_use, "strings", params) simplified = simplify_response(response) # Handle error responses if not isinstance(simplified, dict) or not simplified.get("success", False): return simplified # Extract the result list result_data = simplified.get("result", []) if not isinstance(result_data, list): return simplified # Build query params for cursor hashing query_params = { "tool": "data_list_strings", "port": port_to_use, "filter": filter, "grep": grep } # Use the paginate_response helper return paginate_response( data=result_data, query_params=query_params, tool_name="data_list_strings", session_id=sid, page_size=page_size, grep=grep, grep_ignorecase=grep_ignorecase, return_all=return_all ) @mcp.tool() def data_rename(address: str, name: str, port: int = None) -> dict: """Rename a data item Args: address: Memory address in hex format name: New name for the data item port: Specific Ghidra instance port (optional) Returns: dict: Operation result with the updated data information """ if not address or not name: return { "success": False, "error": { "code": "MISSING_PARAMETER", "message": "Address and name parameters are required" }, "timestamp": int(time.time() * 1000) } port = _get_instance_port(port) payload = { "address": address, "newName": name } response = safe_post(port, "data", payload) return simplify_response(response) @mcp.tool() def data_delete(address: str, port: int = None) -> dict: """Delete data at the specified address Args: address: Memory address in hex format port: Specific Ghidra instance port (optional) Returns: dict: Operation result """ if not address: return { "success": False, "error": { "code": "MISSING_PARAMETER", "message": "Address parameter is required" }, "timestamp": int(time.time() * 1000) } port = _get_instance_port(port) payload = { "address": address, "action": "delete" } response = safe_post(port, "data/delete", payload) return simplify_response(response) @mcp.tool() def data_set_type(address: str, data_type: str, port: int = None) -> dict: """Set the data type of a data item Args: address: Memory address in hex format data_type: Data type name (e.g. "uint32_t", "char[10]") port: Specific Ghidra instance port (optional) Returns: dict: Operation result with the updated data information """ if not address or not data_type: return { "success": False, "error": { "code": "MISSING_PARAMETER", "message": "Address and data_type parameters are required" }, "timestamp": int(time.time() * 1000) } port = _get_instance_port(port) payload = { "address": address, "type": data_type } response = safe_post(port, "data/type", payload) return simplify_response(response) # Struct tools @mcp.tool() def structs_list( category: str = None, port: int = None, # Pagination parameters page_size: int = DEFAULT_PAGE_SIZE, grep: str = None, grep_ignorecase: bool = True, return_all: bool = False, ctx: Context = None ) -> dict: """List all struct data types in the program with cursor-based pagination Args: category: Filter by category path (e.g. "/winapi") port: Specific Ghidra instance port (optional) page_size: Items per page (default: 50, max: 500) grep: Regex pattern to filter results (searches struct names) grep_ignorecase: Case-insensitive grep (default: True) return_all: Return all results without pagination (use with caution) ctx: FastMCP context (auto-injected) Returns: dict: Structs with pagination metadata and cursor for more results """ port_to_use = _get_instance_port(port) # Fetch large batch for client-side pagination params = { "offset": 0, "limit": 10000 # Fetch up to 10K for cursor pagination } if category: params["category"] = category response = safe_get(port_to_use, "structs", params) simplified = simplify_response(response) if not simplified.get("success", False): return simplified all_structs = simplified.get("result", []) # Build query params for cursor hashing query_params = { "tool": "structs_list", "port": port_to_use, "category": category, "grep": grep } sid = _get_session_id(ctx) return paginate_response( data=all_structs, query_params=query_params, tool_name="structs_list", session_id=sid, page_size=page_size, grep=grep, grep_ignorecase=grep_ignorecase, return_all=return_all ) @mcp.tool() def structs_get( name: str, port: int = None, # Pagination parameters (field-based) page_size: int = DEFAULT_PAGE_SIZE, grep: str = None, grep_ignorecase: bool = True, return_all: bool = False, ctx: Context = None ) -> dict: """Get detailed information about a specific struct including all fields Supports pagination for structs with many fields (e.g., large C++ classes). Args: name: Struct name port: Specific Ghidra instance port (optional) page_size: Number of fields per page (default: 50, max: 500) grep: Regex pattern to filter fields (matches field name, type, or comment) grep_ignorecase: Case-insensitive grep matching (default: True) return_all: Return all fields without pagination (WARNING: large structs may have 100+ fields) ctx: FastMCP context (auto-injected) Returns: dict: Struct details with paginated fields list """ if not name: return { "success": False, "error": { "code": "MISSING_PARAMETER", "message": "Struct name parameter is required" }, "timestamp": int(time.time() * 1000) } port = _get_instance_port(port) sid = _get_session_id(ctx) params = {"name": name} response = safe_get(port, "structs", params) simplified = simplify_response(response) # Extract struct info and fields for pagination if not simplified.get("success", True): return simplified result = simplified.get("result", simplified) # Get struct metadata (preserve everything except fields for pagination) struct_info = {} fields = [] if isinstance(result, dict): for key, value in result.items(): if key == "fields" and isinstance(value, list): fields = value else: struct_info[key] = value # If no fields or very few, return as-is if len(fields) <= 10 and not grep: return simplified # Build query params for cursor hashing query_params = { "tool": "structs_get", "port": port, "name": name } # Paginate fields paginated = paginate_response( data=fields, query_params=query_params, tool_name="structs_get", session_id=sid, page_size=page_size, grep=grep, grep_ignorecase=grep_ignorecase, return_all=return_all ) # Merge struct metadata with paginated fields if paginated.get("success"): paginated["struct_name"] = struct_info.get("name", name) paginated["struct_size"] = struct_info.get("size", struct_info.get("length")) paginated["struct_category"] = struct_info.get("category", struct_info.get("categoryPath")) paginated["struct_description"] = struct_info.get("description") # The paginated "result" contains the fields paginated["fields"] = paginated.pop("result", []) # Update message to be struct-specific if "_message" in paginated: paginated["_message"] = paginated["_message"].replace("items", "fields") return paginated @mcp.tool() def structs_create(name: str, category: str = None, description: str = None, port: int = None) -> dict: """Create a new struct data type Args: name: Name for the new struct category: Category path for the struct (e.g. "/custom") description: Optional description for the struct port: Specific Ghidra instance port (optional) Returns: dict: Created struct information """ if not name: return { "success": False, "error": { "code": "MISSING_PARAMETER", "message": "Struct name parameter is required" }, "timestamp": int(time.time() * 1000) } port = _get_instance_port(port) payload = {"name": name} if category: payload["category"] = category if description: payload["description"] = description response = safe_post(port, "structs/create", payload) return simplify_response(response) @mcp.tool() def structs_add_field(struct_name: str, field_name: str, field_type: str, offset: int = None, comment: str = None, port: int = None) -> dict: """Add a field to an existing struct Args: struct_name: Name of the struct to modify field_name: Name for the new field field_type: Data type for the field (e.g. "int", "char", "pointer") offset: Specific offset to insert field (optional, appends to end if not specified) comment: Optional comment for the field port: Specific Ghidra instance port (optional) Returns: dict: Operation result with updated struct size and field information """ if not struct_name or not field_name or not field_type: return { "success": False, "error": { "code": "MISSING_PARAMETER", "message": "struct_name, field_name, and field_type parameters are required" }, "timestamp": int(time.time() * 1000) } port = _get_instance_port(port) payload = { "struct": struct_name, "fieldName": field_name, "fieldType": field_type } if offset is not None: payload["offset"] = offset if comment: payload["comment"] = comment response = safe_post(port, "structs/addfield", payload) return simplify_response(response) @mcp.tool() def structs_update_field(struct_name: str, field_name: str = None, field_offset: int = None, new_name: str = None, new_type: str = None, new_comment: str = None, port: int = None) -> dict: """Update an existing field in a struct (change name, type, or comment) Args: struct_name: Name of the struct to modify field_name: Name of the field to update (use this OR field_offset) field_offset: Offset of the field to update (use this OR field_name) new_name: New name for the field (optional) new_type: New data type for the field (optional, e.g. "int", "pointer") new_comment: New comment for the field (optional) port: Specific Ghidra instance port (optional) Returns: dict: Operation result with old and new field values """ if not struct_name: return { "success": False, "error": { "code": "MISSING_PARAMETER", "message": "struct_name parameter is required" }, "timestamp": int(time.time() * 1000) } if not field_name and field_offset is None: return { "success": False, "error": { "code": "MISSING_PARAMETER", "message": "Either field_name or field_offset must be provided" }, "timestamp": int(time.time() * 1000) } if not new_name and not new_type and new_comment is None: return { "success": False, "error": { "code": "MISSING_PARAMETER", "message": "At least one of new_name, new_type, or new_comment must be provided" }, "timestamp": int(time.time() * 1000) } port = _get_instance_port(port) payload = {"struct": struct_name} if field_name: payload["fieldName"] = field_name if field_offset is not None: payload["fieldOffset"] = field_offset if new_name: payload["newName"] = new_name if new_type: payload["newType"] = new_type if new_comment is not None: payload["newComment"] = new_comment response = safe_post(port, "structs/updatefield", payload) return simplify_response(response) @mcp.tool() def structs_delete(name: str, port: int = None) -> dict: """Delete a struct data type Args: name: Name of the struct to delete port: Specific Ghidra instance port (optional) Returns: dict: Operation result confirming deletion """ if not name: return { "success": False, "error": { "code": "MISSING_PARAMETER", "message": "Struct name parameter is required" }, "timestamp": int(time.time() * 1000) } port = _get_instance_port(port) payload = {"name": name} response = safe_post(port, "structs/delete", payload) return simplify_response(response) # Analysis tools @mcp.tool() def analysis_run(port: int = None, analysis_options: dict = None) -> dict: """Run analysis on the current program Args: analysis_options: Dictionary of analysis options to enable/disable (e.g. {"functionRecovery": True, "dataRefs": False}) port: Specific Ghidra instance port (optional) Returns: dict: Analysis operation result with status """ port = _get_instance_port(port) response = safe_post(port, "analysis", analysis_options or {}) return simplify_response(response) @mcp.tool() def analysis_get_callgraph( name: str = None, address: str = None, max_depth: int = 3, port: int = None, # Pagination parameters page_size: int = DEFAULT_PAGE_SIZE, grep: str = None, grep_ignorecase: bool = True, return_all: bool = False, ctx: Context = None ) -> dict: """Get function call graph with cursor-based pagination on edges Args: name: Starting function name (mutually exclusive with address) address: Starting function address (mutually exclusive with name) max_depth: Maximum call depth to analyze (default: 3) port: Specific Ghidra instance port (optional) page_size: Edges per page (default: 50, max: 500) grep: Regex pattern to filter edges (e.g., "malloc|free", "FUN_00") grep_ignorecase: Case-insensitive grep (default: True) return_all: Return all edges without pagination ctx: FastMCP context (auto-injected) Returns: dict: Call graph with paginated edges. Use cursor_next(cursor_id) for more. Examples: # Get callgraph, filter for memory functions analysis_get_callgraph(name="main", grep="alloc|free|memcpy") # Deep analysis with pagination analysis_get_callgraph(name="main", max_depth=10, page_size=100) """ port_to_use = _get_instance_port(port) params = {"max_depth": max_depth} # Explicitly pass either name or address parameter based on what was provided if address: params["address"] = address func_id = address elif name: params["name"] = name func_id = name else: func_id = "entry_point" # If neither is provided, the Java endpoint will use the entry point response = safe_get(port_to_use, "analysis/callgraph", params) simplified = simplify_response(response) if not simplified.get("success", False): return simplified # Extract graph data - typically has nodes and edges result = simplified.get("result", {}) edges = result.get("edges", []) if isinstance(result, dict) else [] nodes = result.get("nodes", []) if isinstance(result, dict) else [] if not edges: return simplified # Return as-is if no edges # Build query params for cursor hashing query_params = { "tool": "analysis_get_callgraph", "port": port_to_use, "name": name, "address": address, "max_depth": max_depth, "grep": grep } sid = _get_session_id(ctx) # Paginate edges (nodes are typically smaller, include all) paginated = paginate_response( data=edges, query_params=query_params, tool_name="analysis_get_callgraph", session_id=sid, page_size=min(page_size, MAX_PAGE_SIZE), grep=grep, grep_ignorecase=grep_ignorecase, return_all=return_all ) # Reconstruct result with paginated edges if paginated.get("success"): paginated["result"] = { "root_function": func_id, "max_depth": max_depth, "nodes": nodes, # Include all nodes for context "edges": paginated.get("result", []), "total_nodes": len(nodes), } return paginated @mcp.tool() def analysis_get_dataflow( address: str, direction: str = "forward", max_steps: int = 50, port: int = None, # Pagination parameters page_size: int = DEFAULT_PAGE_SIZE, grep: str = None, grep_ignorecase: bool = True, return_all: bool = False, ctx: Context = None ) -> dict: """Perform data flow analysis with cursor-based pagination on steps Args: address: Starting address in hex format direction: "forward" or "backward" (default: "forward") max_steps: Maximum analysis steps (default: 50) port: Specific Ghidra instance port (optional) page_size: Steps per page (default: 50, max: 500) grep: Regex pattern to filter steps (e.g., "MOV|LEA", "EAX|RAX") grep_ignorecase: Case-insensitive grep (default: True) return_all: Return all steps without pagination ctx: FastMCP context (auto-injected) Returns: dict: Data flow steps with pagination. Use cursor_next(cursor_id) for more. Examples: # Track data flow, filter for memory operations analysis_get_dataflow(address="0x401000", grep="MOV|PUSH|POP") # Backward flow to find data sources analysis_get_dataflow(address="0x401000", direction="backward", grep="LEA|MOV") """ if not address: return { "success": False, "error": { "code": "MISSING_PARAMETER", "message": "Address parameter is required" }, "timestamp": int(time.time() * 1000) } port_to_use = _get_instance_port(port) params = { "address": address, "direction": direction, "max_steps": max_steps } response = safe_get(port_to_use, "analysis/dataflow", params) simplified = simplify_response(response) if not simplified.get("success", False): return simplified # Extract dataflow steps result = simplified.get("result", {}) steps = result.get("steps", []) if isinstance(result, dict) else [] if not steps: return simplified # Return as-is if no steps # Build query params for cursor hashing query_params = { "tool": "analysis_get_dataflow", "port": port_to_use, "address": address, "direction": direction, "max_steps": max_steps, "grep": grep } sid = _get_session_id(ctx) # Paginate steps paginated = paginate_response( data=steps, query_params=query_params, tool_name="analysis_get_dataflow", session_id=sid, page_size=min(page_size, MAX_PAGE_SIZE), grep=grep, grep_ignorecase=grep_ignorecase, return_all=return_all ) # Reconstruct result with paginated steps if paginated.get("success"): paginated["result"] = { "start_address": address, "direction": direction, "steps": paginated.get("result", []), } # Preserve other metadata if isinstance(result, dict): for key in ["sources", "sinks", "total_steps"]: if key in result: paginated["result"][key] = result[key] return paginated @mcp.tool() def ui_get_current_address(port: int = None) -> dict: """Get the address currently selected in Ghidra's UI Args: port: Specific Ghidra instance port (optional) Returns: Dict containing address information or error """ port = _get_instance_port(port) response = safe_get(port, "address") return simplify_response(response) @mcp.tool() def ui_get_current_function(port: int = None) -> dict: """Get the function currently selected in Ghidra's UI Args: port: Specific Ghidra instance port (optional) Returns: Dict containing function information or error """ port = _get_instance_port(port) response = safe_get(port, "function") return simplify_response(response) @mcp.tool() def comments_set(address: str, comment: str = "", comment_type: str = "plate", port: int = None) -> dict: """Set a comment at the specified address Args: address: Memory address in hex format comment: Comment text (empty string removes comment) comment_type: Type of comment - "plate", "pre", "post", "eol", "repeatable" (default: "plate") port: Specific Ghidra instance port (optional) Returns: dict: Operation result """ if not address: return { "success": False, "error": { "code": "MISSING_PARAMETER", "message": "Address parameter is required" }, "timestamp": int(time.time() * 1000) } port = _get_instance_port(port) payload = { "comment": comment } response = safe_post(port, f"memory/{address}/comments/{comment_type}", payload) return simplify_response(response) @mcp.tool() def functions_set_comment(address: str, comment: str = "", port: int = None) -> dict: """Set a decompiler-friendly comment (tries function comment, falls back to pre-comment) Args: address: Memory address in hex format (preferably function entry point) comment: Comment text (empty string removes comment) port: Specific Ghidra instance port (optional) Returns: dict: Operation result """ if not address: return { "success": False, "error": { "code": "MISSING_PARAMETER", "message": "Address parameter is required" }, "timestamp": int(time.time() * 1000) } port_to_use = _get_instance_port(port) # Try setting as a function comment first using PATCH try: func_patch_payload = { "comment": comment } patch_response = safe_patch(port_to_use, f"functions/{address}", func_patch_payload) if patch_response.get("success", False): return simplify_response(patch_response) # Success setting function comment else: print(f"Note: Failed to set function comment via PATCH on {address}, falling back. Error: {patch_response.get('error')}", file=sys.stderr) except Exception as e: print(f"Exception trying function comment PATCH: {e}. Falling back.", file=sys.stderr) # Fall through to set pre-comment if PATCH fails # Fallback: Set as a "pre" comment using the comments_set tool print(f"Falling back to setting 'pre' comment for address {address}", file=sys.stderr) return comments_set(address=address, comment=comment, comment_type="pre", port=port_to_use) # ================= Startup ================= def main(): register_instance(DEFAULT_GHIDRA_PORT, f"http://{ghidra_host}:{DEFAULT_GHIDRA_PORT}") # Use quick discovery on startup _discover_instances(QUICK_DISCOVERY_RANGE) # Start background discovery thread discovery_thread = threading.Thread( target=periodic_discovery, daemon=True, name="GhydraMCP-Discovery" ) discovery_thread.start() signal.signal(signal.SIGINT, handle_sigint) mcp.run(transport="stdio") if __name__ == "__main__": main()