diff --git a/CHANGELOG.md b/CHANGELOG.md index b9c325b..91a3065 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,61 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/). ## [Unreleased] +## [2025.12.1] - 2025-12-01 + +### Added +- **Cursor-Based Pagination System:** Implemented efficient pagination for large responses (10K+ items) without filling context windows. + - `page_size` parameter (default: 50, max: 500) for controlling items per page + - `cursor_id` returned for navigating to subsequent pages + - Session isolation prevents cursor cross-contamination between MCP clients + - TTL-based cursor expiration (5 minutes) with LRU eviction (max 100 cursors) +- **Grep/Regex Filtering:** Added `grep` and `grep_ignorecase` parameters to filter results with regex patterns before pagination. +- **Bypass Option:** Added `return_all` parameter to retrieve complete datasets (with large response warnings). +- **Cursor Management Tools:** New MCP tools for cursor lifecycle management: + - `cursor_next(cursor_id)` - Fetch next page of results + - `cursor_list()` - List active cursors for current session + - `cursor_delete(cursor_id)` - Delete specific cursor + - `cursor_delete_all()` - Delete all session cursors +- **Enumeration Resources:** New lightweight MCP resources for quick data enumeration (more efficient than tool calls): + - `/instances` - List all active Ghidra instances + - `/instance/{port}/summary` - Program overview with statistics + - `/instance/{port}/functions` - List functions (capped at 1000) + - `/instance/{port}/strings` - List strings (capped at 500) + - `/instance/{port}/data` - List data items (capped at 1000) + - `/instance/{port}/structs` - List struct types (capped at 500) + - `/instance/{port}/xrefs/to/{address}` - Cross-references to an address + - `/instance/{port}/xrefs/from/{address}` - Cross-references from an address + +### Changed +- **MCP Dependency Upgrade:** Updated from `mcp==1.6.0` to `mcp>=1.22.0` for FastMCP Context support. +- **Version Strategy:** Switched to date-based versioning (YYYY.MM.D format). +- **Tool Updates:** 11 tools now support pagination with grep filtering: + - `functions_list` - List functions with pagination + - `functions_decompile` - Decompiled code with line pagination (grep for code patterns) + - `functions_disassemble` - Assembly with instruction pagination (grep for opcodes) + - `functions_get_variables` - Function variables with pagination + - `data_list` - List data items with pagination + - `data_list_strings` - List strings with pagination + - `xrefs_list` - List cross-references with pagination + - `structs_list` - List struct types with pagination + - `structs_get` - Struct fields with pagination (grep for field names/types) + - `analysis_get_callgraph` - Call graph edges with pagination + - `analysis_get_dataflow` - Data flow steps with pagination +- **LLM-Friendly Responses:** Added prominent `_message` field to guide LLMs on cursor continuation. + +### Fixed +- **FastMCP Compatibility:** Removed deprecated `version` parameter from FastMCP constructor. + +### Security +- **ReDoS Protection:** Added validation for grep regex patterns to prevent catastrophic backtracking attacks. + - Pattern length limit (500 chars) + - Repetition operator limit (15 max) + - Detection of dangerous nested quantifier patterns like `(a+)+` +- **Session Spoofing Prevention:** Removed user-controllable `session_id` parameter from all tools. + - Sessions now derived from FastMCP context (`ctx.session`, `ctx.client_id`) + - Prevents users from accessing or manipulating other sessions' cursors +- **Recursion Depth Limit:** Added depth limit (10) to grep matching to prevent stack overflow on deeply nested data. + ## [2.0.0] - 2025-11-11 ### Added @@ -117,7 +172,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/). - Initial project setup - Basic MCP bridge functionality -[unreleased]: https://github.com/teal-bauer/GhydraMCP/compare/v2.0.0...HEAD +[unreleased]: https://github.com/teal-bauer/GhydraMCP/compare/v2025.12.1...HEAD +[2025.12.1]: https://github.com/teal-bauer/GhydraMCP/compare/v2.0.0...v2025.12.1 [2.0.0]: https://github.com/teal-bauer/GhydraMCP/compare/v1.4.0...v2.0.0 [1.4.0]: https://github.com/teal-bauer/GhydraMCP/compare/v1.3.0...v1.4.0 [1.3.0]: https://github.com/teal-bauer/GhydraMCP/compare/v1.2...v1.3.0 diff --git a/bridge_mcp_hydra.py b/bridge_mcp_hydra.py index af14c1f..bc6401e 100644 --- a/bridge_mcp_hydra.py +++ b/bridge_mcp_hydra.py @@ -1,12 +1,13 @@ # /// script # requires-python = ">=3.11" # dependencies = [ -# "mcp==1.6.0", -# "requests==2.32.3", +# "mcp>=1.22.0", +# "requests>=2.32.3", # ] # /// # GhydraMCP Bridge for Ghidra HATEOAS API - Optimized for MCP integration # Provides namespaced tools for interacting with Ghidra's reverse engineering capabilities +# Features: Cursor-based pagination, grep filtering, session isolation import os import signal import sys @@ -17,7 +18,7 @@ from typing import Dict, List, Optional, Union, Any from urllib.parse import quote, urlencode, urlparse import requests -from mcp.server.fastmcp import FastMCP +from mcp.server.fastmcp import FastMCP, Context # ================= Core Infrastructure ================= @@ -31,11 +32,552 @@ DEFAULT_GHIDRA_HOST = "localhost" QUICK_DISCOVERY_RANGE = range(DEFAULT_GHIDRA_PORT, DEFAULT_GHIDRA_PORT+10) FULL_DISCOVERY_RANGE = range(DEFAULT_GHIDRA_PORT, DEFAULT_GHIDRA_PORT+20) -BRIDGE_VERSION = "v2.1.0" +BRIDGE_VERSION = "2025-12-01" REQUIRED_API_VERSION = 2010 current_instance_port = DEFAULT_GHIDRA_PORT +# ================= Cursor-Based Pagination System ================= +# Provides efficient pagination with grep filtering for large responses +# Inspired by mcplaywright pagination system + +import re +import hashlib +import json +from dataclasses import dataclass, field +from typing import Callable, Iterator +from collections import OrderedDict + +# Configuration +CURSOR_TTL_SECONDS = 300 # 5 minutes +CURSOR_MAX_CACHE_SIZE = 100 # Maximum number of cached cursors +DEFAULT_PAGE_SIZE = 50 +MAX_PAGE_SIZE = 500 +TOKEN_ESTIMATION_RATIO = 4.0 # Roughly 4 chars per token + +# ReDoS Protection Configuration +MAX_GREP_PATTERN_LENGTH = 500 # Maximum regex pattern length +MAX_GREP_REPETITION_OPS = 15 # Maximum repetition operators (* + ? {}) +MAX_GREP_RECURSION_DEPTH = 10 # Maximum depth for nested data grep matching + + +def compile_safe_pattern(pattern: str, flags: int = 0) -> re.Pattern: + """Compile regex pattern with ReDoS protection + + Validates pattern to prevent catastrophic backtracking attacks. + Rejects patterns that are too long or have excessive repetition operators. + + Args: + pattern: Regex pattern string + flags: Regex compilation flags + + Returns: + Compiled regex pattern + + Raises: + ValueError: If pattern fails safety validation + """ + if not pattern: + raise ValueError("Empty pattern") + + # Check pattern length + if len(pattern) > MAX_GREP_PATTERN_LENGTH: + raise ValueError( + f"Pattern too long ({len(pattern)} chars, max {MAX_GREP_PATTERN_LENGTH}). " + "Consider using a simpler pattern or substring match." + ) + + # Count repetition operators that could cause catastrophic backtracking + # These are the main culprits: nested quantifiers like (a+)+, (a*)* + repetition_ops = pattern.count('*') + pattern.count('+') + pattern.count('?') + # Also count bounded repetitions {n,m} + repetition_ops += len(re.findall(r'\{[0-9,]+\}', pattern)) + + if repetition_ops > MAX_GREP_REPETITION_OPS: + raise ValueError( + f"Pattern has too many repetition operators ({repetition_ops}, max {MAX_GREP_REPETITION_OPS}). " + "This could cause performance issues. Consider simplifying the pattern." + ) + + # Check for common dangerous patterns (nested quantifiers) + dangerous_patterns = [ + r'\([^)]*[*+][^)]*\)[*+]', # (a+)+ or (a*)* + r'\([^)]*[*+][^)]*\)\{', # (a+){n,m} + ] + for dangerous in dangerous_patterns: + if re.search(dangerous, pattern): + raise ValueError( + "Pattern contains nested quantifiers which could cause exponential backtracking. " + "Example: (a+)+ is dangerous. Consider using atomic groups or simplifying." + ) + + # Try to compile the pattern + try: + return re.compile(pattern, flags) + except re.error as e: + raise ValueError(f"Invalid regex pattern: {e}") + +@dataclass +class CursorState: + """Represents the state of a paginated query with session isolation""" + cursor_id: str # Unique cursor identifier + session_id: str # Session isolation key + tool_name: str # Tool that created this cursor + query_hash: str # Hash of original query parameters + data: List[Any] # Full result set (or filtered) + total_count: int # Total items before pagination + filtered_count: int # Items after grep filtering + current_offset: int = 0 + page_size: int = DEFAULT_PAGE_SIZE + grep_pattern: str = None + grep_flags: int = 0 + created_at: float = field(default_factory=time.time) + last_accessed: float = field(default_factory=time.time) + + @property + def is_expired(self) -> bool: + return time.time() - self.last_accessed > CURSOR_TTL_SECONDS + + @property + def has_more(self) -> bool: + return self.current_offset + self.page_size < self.filtered_count + + @property + def current_page(self) -> int: + return (self.current_offset // self.page_size) + 1 + + @property + def total_pages(self) -> int: + return max(1, (self.filtered_count + self.page_size - 1) // self.page_size) + + @property + def ttl_remaining(self) -> int: + return max(0, int(CURSOR_TTL_SECONDS - (time.time() - self.last_accessed))) + + def verify_session(self, session_id: str) -> bool: + """Verify cursor belongs to requesting session""" + return self.session_id == session_id + + +class CursorManager: + """Thread-safe cursor manager with TTL-based expiration and session isolation""" + + def __init__(self): + self._cursors: OrderedDict[str, CursorState] = OrderedDict() + self._session_cursors: Dict[str, set] = {} # session_id -> set of cursor_ids + self._lock = Lock() + + def _generate_cursor_id(self, query_hash: str, session_id: str) -> str: + """Generate a unique cursor ID""" + unique = f"{session_id}-{query_hash}-{time.time()}-{id(self)}" + return hashlib.sha256(unique.encode()).hexdigest()[:16] + + def _cleanup_expired(self): + """Remove expired cursors (call while holding lock)""" + expired = [cid for cid, state in self._cursors.items() if state.is_expired] + for cid in expired: + state = self._cursors[cid] + # Remove from session tracking + if state.session_id in self._session_cursors: + self._session_cursors[state.session_id].discard(cid) + del self._cursors[cid] + + # Also enforce max cache size (LRU eviction) + while len(self._cursors) > CURSOR_MAX_CACHE_SIZE: + oldest_id, oldest_state = self._cursors.popitem(last=False) + if oldest_state.session_id in self._session_cursors: + self._session_cursors[oldest_state.session_id].discard(oldest_id) + + def create_cursor(self, data: List[Any], query_params: dict, + tool_name: str = "unknown", + session_id: str = "default", + grep_pattern: str = None, grep_flags: int = 0, + page_size: int = DEFAULT_PAGE_SIZE) -> tuple[str, CursorState]: + """Create a new cursor for paginated results + + Args: + data: The full result set to paginate + query_params: Original query parameters (for hashing) + tool_name: Name of tool creating cursor + session_id: Session identifier for isolation + grep_pattern: Optional regex pattern to filter results + grep_flags: Regex flags (re.IGNORECASE, etc.) + page_size: Items per page + + Returns: + Tuple of (cursor_id, cursor_state) + """ + # Apply grep filtering if pattern provided (with ReDoS protection) + filtered_data = data + if grep_pattern: + pattern = compile_safe_pattern(grep_pattern, grep_flags) + filtered_data = [ + item for item in data + if self._matches_grep(item, pattern) + ] + + # Create query hash for deduplication + query_hash = hashlib.md5( + json.dumps(query_params, sort_keys=True, default=str).encode() + ).hexdigest()[:12] + + with self._lock: + self._cleanup_expired() + + cursor_id = self._generate_cursor_id(query_hash, session_id) + state = CursorState( + cursor_id=cursor_id, + session_id=session_id, + tool_name=tool_name, + query_hash=query_hash, + data=filtered_data, + total_count=len(data), + filtered_count=len(filtered_data), + page_size=min(page_size, MAX_PAGE_SIZE), + grep_pattern=grep_pattern, + grep_flags=grep_flags + ) + self._cursors[cursor_id] = state + + # Track by session + if session_id not in self._session_cursors: + self._session_cursors[session_id] = set() + self._session_cursors[session_id].add(cursor_id) + + return cursor_id, state + + def get_cursor(self, cursor_id: str, session_id: str = None) -> Optional[CursorState]: + """Retrieve a cursor by ID, optionally validating session + + Args: + cursor_id: The cursor identifier + session_id: Optional session to validate against + + Returns: + CursorState if found and valid, None otherwise + """ + with self._lock: + self._cleanup_expired() + + if cursor_id not in self._cursors: + return None + + state = self._cursors[cursor_id] + if state.is_expired: + del self._cursors[cursor_id] + if state.session_id in self._session_cursors: + self._session_cursors[state.session_id].discard(cursor_id) + return None + + # Validate session if provided + if session_id and not state.verify_session(session_id): + return None + + state.last_accessed = time.time() + # Move to end (most recently used) + self._cursors.move_to_end(cursor_id) + return state + + def advance_cursor(self, cursor_id: str, session_id: str = None) -> Optional[CursorState]: + """Advance cursor to next page + + Args: + cursor_id: The cursor identifier + session_id: Optional session to validate against + + Returns: + Updated CursorState or None if invalid/expired + """ + with self._lock: + state = self._cursors.get(cursor_id) + if not state or state.is_expired: + return None + + if session_id and not state.verify_session(session_id): + return None + + state.current_offset += state.page_size + state.last_accessed = time.time() + self._cursors.move_to_end(cursor_id) + return state + + def delete_cursor(self, cursor_id: str, session_id: str = None) -> bool: + """Explicitly delete a cursor + + Args: + cursor_id: The cursor identifier + session_id: Optional session to validate against + + Returns: + True if deleted, False if not found or session mismatch + """ + with self._lock: + if cursor_id not in self._cursors: + return False + + state = self._cursors[cursor_id] + if session_id and not state.verify_session(session_id): + return False + + if state.session_id in self._session_cursors: + self._session_cursors[state.session_id].discard(cursor_id) + del self._cursors[cursor_id] + return True + + def delete_session_cursors(self, session_id: str) -> int: + """Delete all cursors for a session + + Args: + session_id: The session identifier + + Returns: + Number of cursors deleted + """ + with self._lock: + if session_id not in self._session_cursors: + return 0 + + cursor_ids = list(self._session_cursors[session_id]) + count = 0 + for cid in cursor_ids: + if cid in self._cursors: + del self._cursors[cid] + count += 1 + del self._session_cursors[session_id] + return count + + def get_page(self, state: CursorState) -> List[Any]: + """Get current page of data from cursor state""" + start = state.current_offset + end = start + state.page_size + return state.data[start:end] + + def _matches_grep(self, item: Any, pattern: re.Pattern, depth: int = 0) -> bool: + """Check if an item matches the grep pattern + + Searches through string representations of dict values, + list items, or the item itself. + + Args: + item: The item to search + pattern: Compiled regex pattern + depth: Current recursion depth (for stack overflow protection) + + Returns: + True if pattern matches anywhere in the item + """ + # Prevent stack overflow from deeply nested structures + if depth > MAX_GREP_RECURSION_DEPTH: + return False + + if isinstance(item, dict): + # Search all string values in the dict (recursively) + for key, value in item.items(): + if isinstance(value, str) and pattern.search(value): + return True + elif isinstance(value, (int, float)): + if pattern.search(str(value)): + return True + elif isinstance(value, dict): + if self._matches_grep(value, pattern, depth + 1): + return True + elif isinstance(value, (list, tuple)): + if self._matches_grep(value, pattern, depth + 1): + return True + return False + elif isinstance(item, (list, tuple)): + return any(self._matches_grep(i, pattern, depth + 1) for i in item) + elif isinstance(item, str): + return bool(pattern.search(item)) + else: + return bool(pattern.search(str(item))) + + def list_cursors(self, session_id: str = None) -> List[dict]: + """List active cursors, optionally filtered by session + + Args: + session_id: Optional session filter + + Returns: + List of cursor info dicts + """ + with self._lock: + self._cleanup_expired() + return [ + { + "cursor_id": cid, + "session_id": state.session_id, + "tool_name": state.tool_name, + "total_count": state.total_count, + "filtered_count": state.filtered_count, + "current_page": state.current_page, + "total_pages": state.total_pages, + "current_offset": state.current_offset, + "page_size": state.page_size, + "has_more": state.has_more, + "grep_pattern": state.grep_pattern, + "age_seconds": int(time.time() - state.created_at), + "ttl_remaining": state.ttl_remaining + } + for cid, state in self._cursors.items() + if session_id is None or state.session_id == session_id + ] + + def get_stats(self) -> dict: + """Get cursor manager statistics""" + with self._lock: + self._cleanup_expired() + return { + "total_cursors": len(self._cursors), + "total_sessions": len(self._session_cursors), + "max_cache_size": CURSOR_MAX_CACHE_SIZE, + "ttl_seconds": CURSOR_TTL_SECONDS, + "cursors_per_session": { + sid: len(cids) for sid, cids in self._session_cursors.items() + } + } + + +# Global cursor manager instance +cursor_manager = CursorManager() + + +def estimate_tokens(data: List[Any]) -> int: + """Estimate token count for a list of items""" + text = json.dumps(data, default=str) + return int(len(text) / TOKEN_ESTIMATION_RATIO) + + +def paginate_response(data: List[Any], query_params: dict, + tool_name: str = "unknown", + session_id: str = "default", + page_size: int = DEFAULT_PAGE_SIZE, + grep: str = None, grep_ignorecase: bool = True, + return_all: bool = False) -> dict: + """Create a paginated response with optional grep filtering + + Args: + data: Full result list to paginate + query_params: Original query parameters (for cursor creation) + tool_name: Name of the tool creating this response + session_id: Session identifier for cursor isolation + page_size: Items per page (default: 50, max: 500) + grep: Optional regex pattern to filter results + grep_ignorecase: Case-insensitive grep (default: True) + return_all: Bypass pagination and return all results (with warning) + + Returns: + dict with pagination metadata and results + """ + grep_flags = re.IGNORECASE if grep_ignorecase else 0 + + # Handle return_all bypass + if return_all: + # Apply grep filtering even for return_all + filtered_data = data + if grep: + try: + pattern = compile_safe_pattern(grep, grep_flags) + filtered_data = [ + item for item in data + if cursor_manager._matches_grep(item, pattern) + ] + except ValueError as e: + return { + "success": False, + "error": { + "code": "INVALID_GREP_PATTERN", + "message": str(e) + }, + "timestamp": int(time.time() * 1000) + } + + estimated_tokens = estimate_tokens(filtered_data) + warning = None + + if estimated_tokens > 50000: + warning = f"🚨 EXTREMELY LARGE response (~{estimated_tokens:,} tokens) - may cause issues" + elif estimated_tokens > 20000: + warning = f"⚠️ VERY LARGE response (~{estimated_tokens:,} tokens) - consider using pagination" + elif estimated_tokens > 8000: + warning = f"⚠️ Large response (~{estimated_tokens:,} tokens)" + + return { + "success": True, + "result": filtered_data, + "pagination": { + "bypassed": True, + "total_count": len(data), + "filtered_count": len(filtered_data), + "grep_pattern": grep, + "estimated_tokens": estimated_tokens, + "warning": warning + }, + "timestamp": int(time.time() * 1000) + } + + # Normal pagination flow + try: + cursor_id, state = cursor_manager.create_cursor( + data=data, + query_params=query_params, + tool_name=tool_name, + session_id=session_id, + grep_pattern=grep, + grep_flags=grep_flags, + page_size=page_size + ) + except ValueError as e: + return { + "success": False, + "error": { + "code": "INVALID_GREP_PATTERN", + "message": str(e) + }, + "timestamp": int(time.time() * 1000) + } + + current_page = cursor_manager.get_page(state) + + # Only include cursor_id if there are more pages + response_cursor = cursor_id if state.has_more else None + + # Build response with prominent continuation message for LLMs + response = { + "success": True, + "result": current_page, + "pagination": { + "cursor_id": response_cursor, + "session_id": session_id, + "total_count": state.total_count, + "filtered_count": state.filtered_count, + "page_size": state.page_size, + "current_page": state.current_page, + "total_pages": state.total_pages, + "has_more": state.has_more, + "grep_pattern": grep, + "items_returned": len(current_page), + }, + "timestamp": int(time.time() * 1000) + } + + # Add prominent message for LLMs when more data is available + if state.has_more: + remaining = state.filtered_count - (state.current_page * state.page_size) + response["_message"] = ( + f"📄 Showing {len(current_page)} of {state.filtered_count} items " + f"(page {state.current_page}/{state.total_pages}). " + f"To get the next {min(state.page_size, remaining)} items, call: " + f"cursor_next(cursor_id='{cursor_id}')" + ) + else: + response["_message"] = f"✅ Complete: {len(current_page)} items returned (all results)" + + return response + + +# ================= End Cursor System ================= + instructions = """ GhydraMCP allows interacting with multiple Ghidra SRE instances. Ghidra SRE is a tool for reverse engineering and analyzing binaries, e.g. malware. @@ -52,9 +594,35 @@ The API is organized into namespaces for different types of operations: - memory_* : For memory access - xrefs_* : For cross-references - analysis_* : For program analysis +- cursor_* : For pagination cursor management + +## Pagination System +The following tools support cursor-based pagination with grep filtering: +- `functions_list` - List functions (can be 10K+) +- `functions_decompile` - Decompiled code lines (grep for patterns like "if.*NULL") +- `functions_disassemble` - Assembly instructions (grep for "CALL", "JMP", etc.) +- `functions_get_variables` - Function variables (grep for "local_", "param", etc.) +- `data_list` - List data items +- `data_list_strings` - List string data +- `xrefs_list` - List cross-references (can be very large for common functions) +- `structs_list` - List struct types +- `structs_get` - Struct fields (grep for field names/types in large structs) +- `analysis_get_callgraph` - Call graph edges (grep for function names) +- `analysis_get_dataflow` - Data flow steps (grep for opcodes/registers) + +Pagination parameters: +- `page_size`: Items per page (default: 50, max: 500) +- `grep`: Regex pattern to filter results (e.g., "main|init", "FUN_00.*") +- `grep_ignorecase`: Case-insensitive grep (default: True) +- `return_all`: Bypass pagination and return all results (use with caution) + +When results are paginated, the response includes a `_message` field with instructions. +Use `cursor_next(cursor_id)` to fetch the next page of results. +Use `cursor_list()` to see active cursors. +Use `cursor_delete(cursor_id)` to clean up cursors. """ -mcp = FastMCP("GhydraMCP", version=BRIDGE_VERSION, instructions=instructions) +mcp = FastMCP("GhydraMCP", instructions=instructions) ghidra_host = os.environ.get("GHIDRA_HYDRA_HOST", DEFAULT_GHIDRA_HOST) @@ -939,6 +1507,382 @@ def disassembly_by_name(port: int = None, name: str = None) -> str: return "Error: Could not extract disassembly from response" + +# ================= Enumeration Resources ================= +# Lightweight read-only resources for listing/enumerating Ghidra data +# More efficient than tool calls for simple data access + +@mcp.resource(uri="/instances") +def resource_instances_list() -> dict: + """List all active Ghidra instances + + Returns a lightweight summary of available instances for quick enumeration. + Use the /instance/{port} resource for detailed program info. + + Returns: + dict: List of instances with port, project, and file info + """ + # Auto-discover instances before listing + _discover_instances(QUICK_DISCOVERY_RANGE, host=None, timeout=0.5) + + with instances_lock: + instances = [ + { + "port": port, + "project": info.get("project", ""), + "file": info.get("file", ""), + "url": info.get("url", f"http://{ghidra_host}:{port}") + } + for port, info in active_instances.items() + ] + + return { + "instances": instances, + "count": len(instances), + "current_port": current_instance_port, + "_hint": "Use /instance/{port} for detailed program info" + } + + +@mcp.resource(uri="/instance/{port}/functions") +def resource_functions_list(port: int = None) -> dict: + """List all functions in the program (lightweight enumeration) + + Returns function names and addresses for quick reference. + This is a read-only resource - use functions_list tool for filtering/pagination. + + Args: + port: Ghidra instance port + + Returns: + dict: List of functions with name, address, and size + """ + port = _get_instance_port(port) + + # Fetch functions from Ghidra (limited for resource efficiency) + params = {"limit": 1000} # Cap at 1000 for resource response + response = safe_get(port, "functions", params) + simplified = simplify_response(response) + + if not simplified.get("success", True): + return simplified + + functions = simplified.get("result", simplified.get("functions", [])) + if isinstance(functions, dict): + functions = functions.get("functions", []) + + # Extract just the essential fields + func_list = [] + for f in functions[:1000]: # Hard cap + if isinstance(f, dict): + func_list.append({ + "name": f.get("name", "unknown"), + "address": f.get("entryPoint", f.get("address", "")), + "size": f.get("size", 0) + }) + + return { + "functions": func_list, + "count": len(func_list), + "truncated": len(functions) > 1000, + "_hint": "Use functions_list tool for filtering and pagination of large lists" + } + + +@mcp.resource(uri="/instance/{port}/strings") +def resource_strings_list(port: int = None) -> dict: + """List defined strings in the program (lightweight enumeration) + + Returns string values and addresses for quick reference. + Use data_list_strings tool for filtering/pagination. + + Args: + port: Ghidra instance port + + Returns: + dict: List of strings with address and value + """ + port = _get_instance_port(port) + + params = {"limit": 500} # Strings can be verbose, cap lower + response = safe_get(port, "strings", params) + simplified = simplify_response(response) + + if not simplified.get("success", True): + return simplified + + strings = simplified.get("result", simplified.get("strings", [])) + if isinstance(strings, dict): + strings = strings.get("strings", []) + + # Extract essential fields + string_list = [] + for s in strings[:500]: + if isinstance(s, dict): + string_list.append({ + "address": s.get("address", ""), + "value": s.get("value", s.get("string", ""))[:200], # Truncate long strings + "length": s.get("length", len(s.get("value", ""))) + }) + + return { + "strings": string_list, + "count": len(string_list), + "truncated": len(strings) > 500, + "_hint": "Use data_list_strings tool for full strings and pagination" + } + + +@mcp.resource(uri="/instance/{port}/data") +def resource_data_list(port: int = None) -> dict: + """List defined data items in the program (lightweight enumeration) + + Returns data labels, addresses, and types for quick reference. + Use data_list tool for filtering/pagination. + + Args: + port: Ghidra instance port + + Returns: + dict: List of data items with address, name, and type + """ + port = _get_instance_port(port) + + params = {"limit": 1000} + response = safe_get(port, "data", params) + simplified = simplify_response(response) + + if not simplified.get("success", True): + return simplified + + data_items = simplified.get("result", simplified.get("data", [])) + if isinstance(data_items, dict): + data_items = data_items.get("data", []) + + # Extract essential fields + data_list = [] + for d in data_items[:1000]: + if isinstance(d, dict): + data_list.append({ + "address": d.get("address", ""), + "name": d.get("name", d.get("label", "")), + "type": d.get("type", d.get("dataType", "")) + }) + + return { + "data": data_list, + "count": len(data_list), + "truncated": len(data_items) > 1000, + "_hint": "Use data_list tool for filtering and pagination" + } + + +@mcp.resource(uri="/instance/{port}/structs") +def resource_structs_list(port: int = None) -> dict: + """List defined struct types in the program (lightweight enumeration) + + Returns struct names, sizes, and categories for quick reference. + Use structs_list tool for filtering/pagination, structs_get for fields. + + Args: + port: Ghidra instance port + + Returns: + dict: List of structs with name, size, and category + """ + port = _get_instance_port(port) + + params = {"limit": 500} + response = safe_get(port, "structs", params) + simplified = simplify_response(response) + + if not simplified.get("success", True): + return simplified + + structs = simplified.get("result", simplified.get("structs", [])) + if isinstance(structs, dict): + structs = structs.get("structs", []) + + # Extract essential fields + struct_list = [] + for s in structs[:500]: + if isinstance(s, dict): + struct_list.append({ + "name": s.get("name", ""), + "size": s.get("size", s.get("length", 0)), + "category": s.get("category", s.get("categoryPath", "")) + }) + + return { + "structs": struct_list, + "count": len(struct_list), + "truncated": len(structs) > 500, + "_hint": "Use structs_list tool for pagination, structs_get for field details" + } + + +@mcp.resource(uri="/instance/{port}/xrefs/to/{address}") +def resource_xrefs_to(port: int = None, address: str = None) -> dict: + """List cross-references TO an address (lightweight enumeration) + + Returns references pointing to the specified address. + Use xrefs_list tool for full filtering/pagination. + + Args: + port: Ghidra instance port + address: Target address in hex format + + Returns: + dict: List of references to this address + """ + if not address: + return {"error": "Address parameter required"} + + port = _get_instance_port(port) + + params = {"toAddress": address, "limit": 200} + response = safe_get(port, "xrefs", params) + simplified = simplify_response(response) + + if not simplified.get("success", True): + return simplified + + xrefs = simplified.get("result", simplified.get("xrefs", [])) + if isinstance(xrefs, dict): + xrefs = xrefs.get("xrefs", []) + + # Extract essential fields + xref_list = [] + for x in xrefs[:200]: + if isinstance(x, dict): + xref_list.append({ + "from": x.get("fromAddress", x.get("from", "")), + "type": x.get("refType", x.get("type", "")), + "context": x.get("context", "")[:100] if x.get("context") else "" + }) + + return { + "to_address": address, + "references": xref_list, + "count": len(xref_list), + "truncated": len(xrefs) > 200, + "_hint": "Use xrefs_list tool for full filtering and pagination" + } + + +@mcp.resource(uri="/instance/{port}/xrefs/from/{address}") +def resource_xrefs_from(port: int = None, address: str = None) -> dict: + """List cross-references FROM an address (lightweight enumeration) + + Returns references originating from the specified address. + Use xrefs_list tool for full filtering/pagination. + + Args: + port: Ghidra instance port + address: Source address in hex format + + Returns: + dict: List of references from this address + """ + if not address: + return {"error": "Address parameter required"} + + port = _get_instance_port(port) + + params = {"fromAddress": address, "limit": 200} + response = safe_get(port, "xrefs", params) + simplified = simplify_response(response) + + if not simplified.get("success", True): + return simplified + + xrefs = simplified.get("result", simplified.get("xrefs", [])) + if isinstance(xrefs, dict): + xrefs = xrefs.get("xrefs", []) + + # Extract essential fields + xref_list = [] + for x in xrefs[:200]: + if isinstance(x, dict): + xref_list.append({ + "to": x.get("toAddress", x.get("to", "")), + "type": x.get("refType", x.get("type", "")), + "context": x.get("context", "")[:100] if x.get("context") else "" + }) + + return { + "from_address": address, + "references": xref_list, + "count": len(xref_list), + "truncated": len(xrefs) > 200, + "_hint": "Use xrefs_list tool for full filtering and pagination" + } + + +@mcp.resource(uri="/instance/{port}/summary") +def resource_program_summary(port: int = None) -> dict: + """Get a comprehensive summary of the loaded program + + Combines instance info with counts of functions, strings, data, etc. + Useful for getting a quick overview before detailed analysis. + + Args: + port: Ghidra instance port + + Returns: + dict: Program summary with statistics + """ + port = _get_instance_port(port) + + # Get basic program info + program_info = ghidra_instance(port=port) + if "error" in program_info: + return program_info + + # Get counts (lightweight queries) + summary = { + "program": program_info, + "statistics": {} + } + + # Function count + try: + fn_response = safe_get(port, "functions", {"limit": 1}) + if isinstance(fn_response, dict): + total = fn_response.get("result", {}).get("total", 0) + if not total: + total = fn_response.get("total", 0) + summary["statistics"]["functions"] = total + except Exception: + summary["statistics"]["functions"] = "unknown" + + # String count + try: + str_response = safe_get(port, "strings", {"limit": 1}) + if isinstance(str_response, dict): + total = str_response.get("result", {}).get("total", 0) + if not total: + total = str_response.get("total", 0) + summary["statistics"]["strings"] = total + except Exception: + summary["statistics"]["strings"] = "unknown" + + # Data count + try: + data_response = safe_get(port, "data", {"limit": 1}) + if isinstance(data_response, dict): + total = data_response.get("result", {}).get("total", 0) + if not total: + total = data_response.get("total", 0) + summary["statistics"]["data_items"] = total + except Exception: + summary["statistics"]["data_items"] = "unknown" + + summary["_hint"] = "Use /instance/{port}/functions, /strings, /data for listings" + + return summary + + # ================= MCP Prompts ================= # Prompts define reusable templates for LLM interactions @@ -1272,51 +2216,300 @@ def instances_use(port: int) -> str: @mcp.tool() def instances_current() -> dict: """Get information about the current working Ghidra instance - + Returns: dict: Details about the current instance and program """ return ghidra_instance(port=current_instance_port) + +# ================= Cursor Management Tools ================= +# Tools for managing pagination cursors with session isolation + +def _get_session_id(ctx: Context = None) -> str: + """Get session ID from FastMCP context + + Uses the session object's id() for reliable session tracking. + The session object persists across tool calls within the same MCP connection. + + Security: This function does NOT accept manual session_id overrides + to prevent session spoofing attacks. + """ + if ctx: + # Try to get client_id first (explicitly provided by client) + if hasattr(ctx, 'client_id') and ctx.client_id: + return f"client-{ctx.client_id}" + + # Use session object's memory id as unique session identifier + # This persists across tool calls within the same MCP connection + if hasattr(ctx, 'session') and ctx.session: + return f"session-{id(ctx.session)}" + + # Fallback to request_id prefix for stdio transport + if hasattr(ctx, 'request_id') and ctx.request_id: + return f"req-{ctx.request_id[:8]}" if len(ctx.request_id) > 8 else f"req-{ctx.request_id}" + + return "default" + + +@mcp.tool() +def cursor_next(cursor_id: str, ctx: Context = None) -> dict: + """Get the next page of results for a pagination cursor + + Args: + cursor_id: The cursor ID from a previous paginated response + ctx: FastMCP context (auto-injected) + + Returns: + dict: Next page of results with updated pagination info + """ + if not cursor_id: + return { + "success": False, + "error": { + "code": "MISSING_PARAMETER", + "message": "cursor_id parameter is required" + }, + "timestamp": int(time.time() * 1000) + } + + sid = _get_session_id(ctx) + state = cursor_manager.advance_cursor(cursor_id, sid) + + if not state: + return { + "success": False, + "error": { + "code": "CURSOR_NOT_FOUND", + "message": f"Cursor '{cursor_id}' not found, expired, or belongs to another session" + }, + "timestamp": int(time.time() * 1000) + } + + current_page = cursor_manager.get_page(state) + response_cursor = cursor_id if state.has_more else None + + response = { + "success": True, + "result": current_page, + "pagination": { + "cursor_id": response_cursor, + "session_id": state.session_id, + "tool_name": state.tool_name, + "total_count": state.total_count, + "filtered_count": state.filtered_count, + "page_size": state.page_size, + "current_page": state.current_page, + "total_pages": state.total_pages, + "has_more": state.has_more, + "grep_pattern": state.grep_pattern, + "items_returned": len(current_page), + "ttl_remaining": state.ttl_remaining, + }, + "timestamp": int(time.time() * 1000) + } + + # Add prominent message for LLMs + if state.has_more: + remaining = state.filtered_count - (state.current_page * state.page_size) + response["_message"] = ( + f"📄 Page {state.current_page}/{state.total_pages}: " + f"{len(current_page)} items. {remaining} more available. " + f"Continue with: cursor_next(cursor_id='{cursor_id}')" + ) + else: + total_fetched = state.current_page * state.page_size + response["_message"] = ( + f"✅ Final page {state.current_page}/{state.total_pages}: " + f"{len(current_page)} items. All {state.filtered_count} items retrieved." + ) + + return response + + +@mcp.tool() +def cursor_list(ctx: Context = None, all_sessions: bool = False) -> dict: + """List active pagination cursors + + Args: + ctx: FastMCP context (auto-injected) + all_sessions: If True, list cursors from all sessions (admin use) + + Returns: + dict: List of active cursors with their metadata + """ + sid = None if all_sessions else _get_session_id(ctx) + cursors = cursor_manager.list_cursors(session_id=sid) + + return { + "success": True, + "result": cursors, + "stats": cursor_manager.get_stats(), + "timestamp": int(time.time() * 1000) + } + + +@mcp.tool() +def cursor_delete(cursor_id: str, ctx: Context = None) -> dict: + """Delete a pagination cursor to free resources + + Args: + cursor_id: The cursor ID to delete + ctx: FastMCP context (auto-injected) + + Returns: + dict: Operation result + """ + if not cursor_id: + return { + "success": False, + "error": { + "code": "MISSING_PARAMETER", + "message": "cursor_id parameter is required" + }, + "timestamp": int(time.time() * 1000) + } + + sid = _get_session_id(ctx) + deleted = cursor_manager.delete_cursor(cursor_id, sid) + + if deleted: + return { + "success": True, + "result": { + "deleted": True, + "cursor_id": cursor_id, + "message": "Cursor deleted successfully" + }, + "timestamp": int(time.time() * 1000) + } + else: + return { + "success": False, + "error": { + "code": "CURSOR_NOT_FOUND", + "message": f"Cursor '{cursor_id}' not found or belongs to another session" + }, + "timestamp": int(time.time() * 1000) + } + + +@mcp.tool() +def cursor_delete_all(ctx: Context = None) -> dict: + """Delete all pagination cursors for the current session + + Args: + ctx: FastMCP context (auto-injected) + + Returns: + dict: Number of cursors deleted + """ + sid = _get_session_id(ctx) + count = cursor_manager.delete_session_cursors(sid) + + return { + "success": True, + "result": { + "deleted_count": count, + "session_id": sid, + "message": f"Deleted {count} cursor(s) for session '{sid}'" + }, + "timestamp": int(time.time() * 1000) + } + + +# ================= End Cursor Management Tools ================= + # Function tools @mcp.tool() -def functions_list(offset: int = 0, limit: int = 100, - name_contains: str = None, - name_matches_regex: str = None, - port: int = None) -> dict: - """List functions with filtering and pagination - +def functions_list( + name_contains: str = None, + name_matches_regex: str = None, + port: int = None, + # Pagination parameters + page_size: int = DEFAULT_PAGE_SIZE, + grep: str = None, + grep_ignorecase: bool = True, + return_all: bool = False, + ctx: Context = None +) -> dict: + """List functions with cursor-based pagination and grep filtering + Args: - offset: Pagination offset (default: 0) - limit: Maximum items to return (default: 100) - name_contains: Substring name filter (case-insensitive) - name_matches_regex: Regex name filter + name_contains: Substring name filter (case-insensitive, server-side) + name_matches_regex: Regex name filter (server-side) port: Specific Ghidra instance port (optional) - + page_size: Items per page (default: 50, max: 500) + grep: Regex pattern to filter results client-side (e.g., "main|init", "FUN_.*") + grep_ignorecase: Case-insensitive grep (default: True) + return_all: Bypass pagination and return all results (use with caution) + ctx: FastMCP context (auto-injected) + + Returns: - dict: List of functions with pagination information + dict: List of functions with pagination info. Use cursor_next(cursor_id) for more. + + Examples: + # Get first page of all functions + functions_list() + + # Filter to functions containing "main" + functions_list(name_contains="main") + + # Client-side grep for FUN_* named functions + functions_list(grep="^FUN_") + + # Get all functions (bypasses pagination - use carefully!) + functions_list(return_all=True) """ - port = _get_instance_port(port) - + port_to_use = _get_instance_port(port) + sid = _get_session_id(ctx) + + # Fetch a larger batch from Ghidra to enable client-side pagination + # We request more than page_size to allow grep filtering + fetch_limit = 5000 if return_all else max(page_size * 10, 500) + params = { - "offset": offset, - "limit": limit + "offset": 0, + "limit": fetch_limit } if name_contains: params["name_contains"] = name_contains if name_matches_regex: params["name_matches_regex"] = name_matches_regex - response = safe_get(port, "functions", params) + response = safe_get(port_to_use, "functions", params) simplified = simplify_response(response) - - # Ensure we maintain pagination metadata - if isinstance(simplified, dict) and "error" not in simplified: - simplified.setdefault("size", len(simplified.get("result", []))) - simplified.setdefault("offset", offset) - simplified.setdefault("limit", limit) - - return simplified + + # Handle error responses + if not isinstance(simplified, dict) or not simplified.get("success", False): + return simplified + + # Extract the result list + result_data = simplified.get("result", []) + if not isinstance(result_data, list): + return simplified + + # Build query params for cursor hashing + query_params = { + "tool": "functions_list", + "port": port_to_use, + "name_contains": name_contains, + "name_matches_regex": name_matches_regex, + "grep": grep + } + + # Use the paginate_response helper + return paginate_response( + data=result_data, + query_params=query_params, + tool_name="functions_list", + session_id=sid, + page_size=page_size, + grep=grep, + grep_ignorecase=grep_ignorecase, + return_all=return_all + ) @mcp.tool() def functions_get(name: str = None, address: str = None, port: int = None) -> dict: @@ -1351,35 +2544,46 @@ def functions_get(name: str = None, address: str = None, port: int = None) -> di return simplify_response(response) @mcp.tool() -def functions_decompile(name: str = None, address: str = None, - syntax_tree: bool = False, style: str = "normalize", - start_line: int = None, end_line: int = None, max_lines: int = None, - port: int = None) -> dict: - """Get decompiled code for a function with optional line filtering for context management +def functions_decompile( + name: str = None, + address: str = None, + syntax_tree: bool = False, + style: str = "normalize", + port: int = None, + # Pagination parameters (line-based) + page_size: int = 50, + grep: str = None, + grep_ignorecase: bool = True, + return_all: bool = False, + ctx: Context = None +) -> dict: + """Get decompiled code for a function with cursor-based line pagination Args: name: Function name (mutually exclusive with address) address: Function address in hex format (mutually exclusive with name) syntax_tree: Include syntax tree (default: False) style: Decompiler style (default: "normalize") - start_line: Start at this line number (1-indexed, optional) - end_line: End at this line number (inclusive, optional) - max_lines: Maximum number of lines to return (optional, takes precedence over end_line) port: Specific Ghidra instance port (optional) + page_size: Lines per page (default: 50, max: 500) + grep: Regex pattern to filter lines (e.g., "if.*==", "malloc|free") + grep_ignorecase: Case-insensitive grep (default: True) + return_all: Return all lines without pagination (use with caution for large functions) + ctx: FastMCP context (auto-injected) + Returns: - dict: Contains function information and decompiled code (potentially filtered). - If filtering is applied, includes a 'filter' object with total_lines and applied parameters. + dict: Decompiled code with pagination. Use cursor_next(cursor_id) for more lines. Examples: - # Get first 20 lines of decompiled code - functions_decompile(name="main", max_lines=20) + # Get first 50 lines (default) + functions_decompile(name="main") - # Get lines 10-30 - functions_decompile(name="main", start_line=10, end_line=30) + # Search for specific patterns + functions_decompile(name="main", grep="if.*NULL") - # Get 15 lines starting from line 25 - functions_decompile(name="main", start_line=25, max_lines=15) + # Get all lines (for small functions) + functions_decompile(name="small_func", return_all=True) """ if not name and not address: return { @@ -1391,42 +2595,117 @@ def functions_decompile(name: str = None, address: str = None, "timestamp": int(time.time() * 1000) } - port = _get_instance_port(port) + port_to_use = _get_instance_port(port) params = { "syntax_tree": str(syntax_tree).lower(), "style": style } - # Add line filtering parameters if provided - if start_line is not None: - params["start_line"] = str(start_line) - if end_line is not None: - params["end_line"] = str(end_line) - if max_lines is not None: - params["max_lines"] = str(max_lines) - if address: endpoint = f"functions/{address}/decompile" + func_id = address else: endpoint = f"functions/by-name/{quote(name)}/decompile" + func_id = name - response = safe_get(port, endpoint, params) + response = safe_get(port_to_use, endpoint, params) simplified = simplify_response(response) - return simplified + if not simplified.get("success", False): + return simplified + + # Extract the decompiled code and split into lines + result = simplified.get("result", {}) + code = result.get("code", "") if isinstance(result, dict) else "" + + if not code: + return simplified # Return as-is if no code + + # Split code into lines, preserving line numbers + lines = code.split('\n') + # Create line objects with line numbers for better grep matching + line_objects = [{"line_num": i + 1, "code": line} for i, line in enumerate(lines)] + + # Build query params for cursor hashing + query_params = { + "tool": "functions_decompile", + "port": port_to_use, + "name": name, + "address": address, + "style": style, + "grep": grep + } + + sid = _get_session_id(ctx) + + # Use pagination system + paginated = paginate_response( + data=line_objects, + query_params=query_params, + tool_name="functions_decompile", + session_id=sid, + page_size=min(page_size, MAX_PAGE_SIZE), + grep=grep, + grep_ignorecase=grep_ignorecase, + return_all=return_all + ) + + # Transform result back to code format with line numbers + if paginated.get("success"): + page_lines = paginated.get("result", []) + # Format as "line_num: code" for clarity + formatted_lines = [f"{item['line_num']:4d}: {item['code']}" for item in page_lines] + paginated["result"] = { + "function": func_id, + "code_lines": formatted_lines, + "raw_lines": [item['code'] for item in page_lines] + } + # Add function metadata if available + if isinstance(result, dict): + for key in ["name", "address", "signature", "return_type"]: + if key in result: + paginated["result"][key] = result[key] + + return paginated @mcp.tool() -def functions_disassemble(name: str = None, address: str = None, port: int = None) -> dict: - """Get disassembly for a function - +def functions_disassemble( + name: str = None, + address: str = None, + port: int = None, + # Pagination parameters (instruction-based) + page_size: int = 50, + grep: str = None, + grep_ignorecase: bool = True, + return_all: bool = False, + ctx: Context = None +) -> dict: + """Get disassembly for a function with cursor-based instruction pagination + Args: name: Function name (mutually exclusive with address) address: Function address in hex format (mutually exclusive with name) port: Specific Ghidra instance port (optional) - + page_size: Instructions per page (default: 50, max: 500) + grep: Regex pattern to filter instructions (e.g., "CALL", "JMP|JNZ", "MOV.*EAX") + grep_ignorecase: Case-insensitive grep (default: True) + return_all: Return all instructions without pagination + ctx: FastMCP context (auto-injected) + + Returns: - dict: Contains function information and disassembly text + dict: Disassembly with pagination. Use cursor_next(cursor_id) for more instructions. + + Examples: + # Get first 50 instructions + functions_disassemble(name="main") + + # Find all CALL instructions + functions_disassemble(name="main", grep="CALL") + + # Find jumps and conditional jumps + functions_disassemble(name="main", grep="^J") """ if not name and not address: return { @@ -1437,16 +2716,84 @@ def functions_disassemble(name: str = None, address: str = None, port: int = Non }, "timestamp": int(time.time() * 1000) } - - port = _get_instance_port(port) - + + port_to_use = _get_instance_port(port) + if address: endpoint = f"functions/{address}/disassembly" + func_id = address else: endpoint = f"functions/by-name/{quote(name)}/disassembly" - - response = safe_get(port, endpoint) - return simplify_response(response) + func_id = name + + response = safe_get(port_to_use, endpoint) + simplified = simplify_response(response) + + if not simplified.get("success", False): + return simplified + + # Extract the disassembly - could be text or structured + result = simplified.get("result", {}) + + # Handle different response formats + if isinstance(result, dict): + disasm_text = result.get("disassembly", "") or result.get("text", "") + instructions = result.get("instructions", []) + elif isinstance(result, str): + disasm_text = result + instructions = [] + else: + disasm_text = "" + instructions = [] + + # If we have structured instructions, use them; otherwise parse text + if instructions: + # Already have instruction objects + line_objects = instructions + elif disasm_text: + # Split text into lines + lines = disasm_text.strip().split('\n') + line_objects = [{"addr": f"line_{i+1}", "instruction": line} for i, line in enumerate(lines) if line.strip()] + else: + return simplified # Return as-is if no disassembly + + # Build query params for cursor hashing + query_params = { + "tool": "functions_disassemble", + "port": port_to_use, + "name": name, + "address": address, + "grep": grep + } + + sid = _get_session_id(ctx) + + # Use pagination system + paginated = paginate_response( + data=line_objects, + query_params=query_params, + tool_name="functions_disassemble", + session_id=sid, + page_size=min(page_size, MAX_PAGE_SIZE), + grep=grep, + grep_ignorecase=grep_ignorecase, + return_all=return_all + ) + + # Add function context to result + if paginated.get("success"): + page_instructions = paginated.get("result", []) + paginated["result"] = { + "function": func_id, + "instructions": page_instructions + } + # Add function metadata if available + if isinstance(result, dict): + for key in ["name", "address", "entry_point", "size"]: + if key in result: + paginated["result"][key] = result[key] + + return paginated @mcp.tool() def functions_create(address: str, port: int = None) -> dict: @@ -1553,16 +2900,39 @@ def functions_set_signature(name: str = None, address: str = None, signature: st return simplify_response(response) @mcp.tool() -def functions_get_variables(name: str = None, address: str = None, port: int = None) -> dict: - """Get variables for a function - +def functions_get_variables( + name: str = None, + address: str = None, + port: int = None, + # Pagination parameters + page_size: int = DEFAULT_PAGE_SIZE, + grep: str = None, + grep_ignorecase: bool = True, + return_all: bool = False, + ctx: Context = None +) -> dict: + """Get variables for a function with cursor-based pagination + Args: name: Function name (mutually exclusive with address) address: Function address in hex format (mutually exclusive with name) port: Specific Ghidra instance port (optional) - + page_size: Variables per page (default: 50, max: 500) + grep: Regex pattern to filter variables (e.g., "local_", "param", "ptr.*int") + grep_ignorecase: Case-insensitive grep (default: True) + return_all: Return all variables without pagination + ctx: FastMCP context (auto-injected) + + Returns: - dict: Contains function information and list of variables + dict: Variables with pagination. Use cursor_next(cursor_id) for more. + + Examples: + # Get all local variables + functions_get_variables(name="main", grep="local_") + + # Find pointer variables + functions_get_variables(name="main", grep="ptr|\\*") """ if not name and not address: return { @@ -1573,16 +2943,65 @@ def functions_get_variables(name: str = None, address: str = None, port: int = N }, "timestamp": int(time.time() * 1000) } - - port = _get_instance_port(port) - + + port_to_use = _get_instance_port(port) + if address: endpoint = f"functions/{address}/variables" + func_id = address else: endpoint = f"functions/by-name/{quote(name)}/variables" - - response = safe_get(port, endpoint) - return simplify_response(response) + func_id = name + + response = safe_get(port_to_use, endpoint) + simplified = simplify_response(response) + + if not simplified.get("success", False): + return simplified + + # Extract variables list + result = simplified.get("result", {}) + variables = result.get("variables", []) if isinstance(result, dict) else [] + + if not variables: + return simplified # Return as-is if no variables + + # Build query params for cursor hashing + query_params = { + "tool": "functions_get_variables", + "port": port_to_use, + "name": name, + "address": address, + "grep": grep + } + + sid = _get_session_id(ctx) + + # Use pagination system + paginated = paginate_response( + data=variables, + query_params=query_params, + tool_name="functions_get_variables", + session_id=sid, + page_size=min(page_size, MAX_PAGE_SIZE), + grep=grep, + grep_ignorecase=grep_ignorecase, + return_all=return_all + ) + + # Add function context + if paginated.get("success"): + paginated["result"] = { + "function": func_id, + "variables": paginated.get("result", []) + } + # Preserve other metadata + if isinstance(result, dict): + for key in ["name", "address", "parameter_count", "local_count"]: + if key in result: + paginated["result"][key] = result[key] + + return paginated # Memory tools @mcp.tool() @@ -1696,37 +3115,52 @@ def memory_write(address: str, bytes_data: str, format: str = "hex", port: int = # Xrefs tools @mcp.tool() -def xrefs_list(to_addr: str = None, from_addr: str = None, type: str = None, - offset: int = 0, limit: int = 100, port: int = None) -> dict: - """List cross-references with filtering and pagination - +def xrefs_list( + to_addr: str = None, + from_addr: str = None, + type: str = None, + port: int = None, + # Pagination parameters + page_size: int = DEFAULT_PAGE_SIZE, + grep: str = None, + grep_ignorecase: bool = True, + return_all: bool = False, + ctx: Context = None +) -> dict: + """List cross-references with filtering and cursor-based pagination + Args: to_addr: Filter references to this address (hexadecimal) - from_addr: Filter references from this address (hexadecimal) + from_addr: Filter references from this address (hexadecimal) type: Filter by reference type (e.g. "CALL", "READ", "WRITE") - offset: Pagination offset (default: 0) - limit: Maximum items to return (default: 100) port: Specific Ghidra instance port (optional) - + page_size: Items per page (default: 50, max: 500) + grep: Regex pattern to filter results + grep_ignorecase: Case-insensitive grep (default: True) + return_all: Return all results without pagination (use with caution) + ctx: FastMCP context (auto-injected) + + Returns: - dict: Cross-references matching the filters + dict: Cross-references with pagination metadata and cursor for more results """ # At least one of the address parameters must be provided if not to_addr and not from_addr: return { "success": False, "error": { - "code": "MISSING_PARAMETER", + "code": "MISSING_PARAMETER", "message": "Either to_addr or from_addr parameter is required" }, "timestamp": int(time.time() * 1000) } - - port = _get_instance_port(port) - + + port_to_use = _get_instance_port(port) + + # Fetch large batch for client-side pagination params = { - "offset": offset, - "limit": limit + "offset": 0, + "limit": 10000 # Fetch up to 10K for cursor pagination } if to_addr: params["to_addr"] = to_addr @@ -1735,41 +3169,76 @@ def xrefs_list(to_addr: str = None, from_addr: str = None, type: str = None, if type: params["type"] = type - response = safe_get(port, "xrefs", params) + response = safe_get(port_to_use, "xrefs", params) simplified = simplify_response(response) - - # Ensure we maintain pagination metadata - if isinstance(simplified, dict) and "error" not in simplified: - simplified.setdefault("size", len(simplified.get("result", []))) - simplified.setdefault("offset", offset) - simplified.setdefault("limit", limit) - - return simplified + + if not simplified.get("success", False): + return simplified + + all_xrefs = simplified.get("result", []) + + # Build query params for cursor hashing + query_params = { + "tool": "xrefs_list", + "port": port_to_use, + "to_addr": to_addr, + "from_addr": from_addr, + "type": type, + "grep": grep + } + + sid = _get_session_id(ctx) + + return paginate_response( + data=all_xrefs, + query_params=query_params, + tool_name="xrefs_list", + session_id=sid, + page_size=page_size, + grep=grep, + grep_ignorecase=grep_ignorecase, + return_all=return_all + ) # Data tools @mcp.tool() -def data_list(offset: int = 0, limit: int = 100, addr: str = None, - name: str = None, name_contains: str = None, type: str = None, - port: int = None) -> dict: - """List defined data items with filtering and pagination - +def data_list( + addr: str = None, + name: str = None, + name_contains: str = None, + type: str = None, + port: int = None, + # Pagination parameters + page_size: int = DEFAULT_PAGE_SIZE, + grep: str = None, + grep_ignorecase: bool = True, + return_all: bool = False, + ctx: Context = None +) -> dict: + """List defined data items with filtering and cursor-based pagination + Args: - offset: Pagination offset (default: 0) - limit: Maximum items to return (default: 100) addr: Filter by address (hexadecimal) name: Exact name match filter (case-sensitive) name_contains: Substring name filter (case-insensitive) type: Filter by data type (e.g. "string", "dword") port: Specific Ghidra instance port (optional) - + page_size: Items per page (default: 50, max: 500) + grep: Regex pattern to filter results + grep_ignorecase: Case-insensitive grep (default: True) + return_all: Return all results without pagination (use with caution) + ctx: FastMCP context (auto-injected) + + Returns: - dict: Data items matching the filters + dict: Data items with pagination metadata and cursor for more results """ - port = _get_instance_port(port) - + port_to_use = _get_instance_port(port) + + # Fetch large batch for client-side pagination params = { - "offset": offset, - "limit": limit + "offset": 0, + "limit": 10000 # Fetch up to 10K for cursor pagination } if addr: params["addr"] = addr @@ -1780,16 +3249,37 @@ def data_list(offset: int = 0, limit: int = 100, addr: str = None, if type: params["type"] = type - response = safe_get(port, "data", params) + response = safe_get(port_to_use, "data", params) simplified = simplify_response(response) - - # Ensure we maintain pagination metadata - if isinstance(simplified, dict) and "error" not in simplified: - simplified.setdefault("size", len(simplified.get("result", []))) - simplified.setdefault("offset", offset) - simplified.setdefault("limit", limit) - - return simplified + + if not simplified.get("success", False): + return simplified + + all_data = simplified.get("result", []) + + # Build query params for cursor hashing + query_params = { + "tool": "data_list", + "port": port_to_use, + "addr": addr, + "name": name, + "name_contains": name_contains, + "type": type, + "grep": grep + } + + sid = _get_session_id(ctx) + + return paginate_response( + data=all_data, + query_params=query_params, + tool_name="data_list", + session_id=sid, + page_size=page_size, + grep=grep, + grep_ignorecase=grep_ignorecase, + return_all=return_all + ) @mcp.tool() def data_create(address: str, data_type: str, size: int = None, port: int = None) -> dict: @@ -1828,30 +3318,89 @@ def data_create(address: str, data_type: str, size: int = None, port: int = None return simplify_response(response) @mcp.tool() -def data_list_strings(offset: int = 0, limit: int = 2000, filter: str = None, port: int = None) -> dict: - """List all defined strings in the binary with their memory addresses - +def data_list_strings( + filter: str = None, + port: int = None, + # Pagination parameters + page_size: int = DEFAULT_PAGE_SIZE, + grep: str = None, + grep_ignorecase: bool = True, + return_all: bool = False, + ctx: Context = None +) -> dict: + """List all defined strings in the binary with cursor-based pagination and grep filtering + Args: - offset: Pagination offset (default: 0) - limit: Maximum strings to return (default: 2000) - filter: Optional string content filter + filter: Server-side string content filter port: Specific Ghidra instance port (optional) - + page_size: Items per page (default: 50, max: 500) + grep: Regex pattern to filter results client-side (e.g., "password|key", "http://") + grep_ignorecase: Case-insensitive grep (default: True) + return_all: Bypass pagination and return all strings (use with caution) + ctx: FastMCP context (auto-injected) + + Returns: - dict: List of string data with addresses, values, and metadata + dict: List of string data with pagination info. Use cursor_next(cursor_id) for more. + + Examples: + # Get first page of strings + data_list_strings() + + # Filter to strings containing "error" + data_list_strings(filter="error") + + # Client-side grep for URLs + data_list_strings(grep="https?://") + + # Get all strings (bypasses pagination) + data_list_strings(return_all=True) """ - port = _get_instance_port(port) - + port_to_use = _get_instance_port(port) + sid = _get_session_id(ctx) + + # Fetch larger batch for client-side pagination + fetch_limit = 10000 if return_all else max(page_size * 10, 2000) + params = { - "offset": offset, - "limit": limit + "offset": 0, + "limit": fetch_limit } - + if filter: params["filter"] = filter - - response = safe_get(port, "strings", params) - return simplify_response(response) + + response = safe_get(port_to_use, "strings", params) + simplified = simplify_response(response) + + # Handle error responses + if not isinstance(simplified, dict) or not simplified.get("success", False): + return simplified + + # Extract the result list + result_data = simplified.get("result", []) + if not isinstance(result_data, list): + return simplified + + # Build query params for cursor hashing + query_params = { + "tool": "data_list_strings", + "port": port_to_use, + "filter": filter, + "grep": grep + } + + # Use the paginate_response helper + return paginate_response( + data=result_data, + query_params=query_params, + tool_name="data_list_strings", + session_id=sid, + page_size=page_size, + grep=grep, + grep_ignorecase=grep_ignorecase, + return_all=return_all + ) @mcp.tool() def data_rename(address: str, name: str, port: int = None) -> dict: @@ -1950,48 +3499,97 @@ def data_set_type(address: str, data_type: str, port: int = None) -> dict: # Struct tools @mcp.tool() -def structs_list(offset: int = 0, limit: int = 100, category: str = None, port: int = None) -> dict: - """List all struct data types in the program +def structs_list( + category: str = None, + port: int = None, + # Pagination parameters + page_size: int = DEFAULT_PAGE_SIZE, + grep: str = None, + grep_ignorecase: bool = True, + return_all: bool = False, + ctx: Context = None +) -> dict: + """List all struct data types in the program with cursor-based pagination Args: - offset: Pagination offset (default: 0) - limit: Maximum items to return (default: 100) category: Filter by category path (e.g. "/winapi") port: Specific Ghidra instance port (optional) + page_size: Items per page (default: 50, max: 500) + grep: Regex pattern to filter results (searches struct names) + grep_ignorecase: Case-insensitive grep (default: True) + return_all: Return all results without pagination (use with caution) + ctx: FastMCP context (auto-injected) + Returns: - dict: List of structs with name, size, and field count + dict: Structs with pagination metadata and cursor for more results """ - port = _get_instance_port(port) + port_to_use = _get_instance_port(port) + # Fetch large batch for client-side pagination params = { - "offset": offset, - "limit": limit + "offset": 0, + "limit": 10000 # Fetch up to 10K for cursor pagination } if category: params["category"] = category - response = safe_get(port, "structs", params) + response = safe_get(port_to_use, "structs", params) simplified = simplify_response(response) - # Ensure we maintain pagination metadata - if isinstance(simplified, dict) and "error" not in simplified: - simplified.setdefault("size", len(simplified.get("result", []))) - simplified.setdefault("offset", offset) - simplified.setdefault("limit", limit) + if not simplified.get("success", False): + return simplified - return simplified + all_structs = simplified.get("result", []) + + # Build query params for cursor hashing + query_params = { + "tool": "structs_list", + "port": port_to_use, + "category": category, + "grep": grep + } + + sid = _get_session_id(ctx) + + return paginate_response( + data=all_structs, + query_params=query_params, + tool_name="structs_list", + session_id=sid, + page_size=page_size, + grep=grep, + grep_ignorecase=grep_ignorecase, + return_all=return_all + ) @mcp.tool() -def structs_get(name: str, port: int = None) -> dict: +def structs_get( + name: str, + port: int = None, + # Pagination parameters (field-based) + page_size: int = DEFAULT_PAGE_SIZE, + grep: str = None, + grep_ignorecase: bool = True, + return_all: bool = False, + ctx: Context = None +) -> dict: """Get detailed information about a specific struct including all fields + Supports pagination for structs with many fields (e.g., large C++ classes). + Args: name: Struct name port: Specific Ghidra instance port (optional) + page_size: Number of fields per page (default: 50, max: 500) + grep: Regex pattern to filter fields (matches field name, type, or comment) + grep_ignorecase: Case-insensitive grep matching (default: True) + return_all: Return all fields without pagination (WARNING: large structs may have 100+ fields) + ctx: FastMCP context (auto-injected) + Returns: - dict: Struct details including all fields with their names, types, and offsets + dict: Struct details with paginated fields list """ if not name: return { @@ -2004,10 +3602,66 @@ def structs_get(name: str, port: int = None) -> dict: } port = _get_instance_port(port) + sid = _get_session_id(ctx) params = {"name": name} response = safe_get(port, "structs", params) - return simplify_response(response) + simplified = simplify_response(response) + + # Extract struct info and fields for pagination + if not simplified.get("success", True): + return simplified + + result = simplified.get("result", simplified) + + # Get struct metadata (preserve everything except fields for pagination) + struct_info = {} + fields = [] + + if isinstance(result, dict): + for key, value in result.items(): + if key == "fields" and isinstance(value, list): + fields = value + else: + struct_info[key] = value + + # If no fields or very few, return as-is + if len(fields) <= 10 and not grep: + return simplified + + # Build query params for cursor hashing + query_params = { + "tool": "structs_get", + "port": port, + "name": name + } + + # Paginate fields + paginated = paginate_response( + data=fields, + query_params=query_params, + tool_name="structs_get", + session_id=sid, + page_size=page_size, + grep=grep, + grep_ignorecase=grep_ignorecase, + return_all=return_all + ) + + # Merge struct metadata with paginated fields + if paginated.get("success"): + paginated["struct_name"] = struct_info.get("name", name) + paginated["struct_size"] = struct_info.get("size", struct_info.get("length")) + paginated["struct_category"] = struct_info.get("category", struct_info.get("categoryPath")) + paginated["struct_description"] = struct_info.get("description") + # The paginated "result" contains the fields + paginated["fields"] = paginated.pop("result", []) + + # Update message to be struct-specific + if "_message" in paginated: + paginated["_message"] = paginated["_message"].replace("items", "fields") + + return paginated @mcp.tool() def structs_create(name: str, category: str = None, description: str = None, port: int = None) -> dict: @@ -2194,44 +3848,143 @@ def analysis_run(port: int = None, analysis_options: dict = None) -> dict: return simplify_response(response) @mcp.tool() -def analysis_get_callgraph(name: str = None, address: str = None, max_depth: int = 3, port: int = None) -> dict: - """Get function call graph visualization data +def analysis_get_callgraph( + name: str = None, + address: str = None, + max_depth: int = 3, + port: int = None, + # Pagination parameters + page_size: int = DEFAULT_PAGE_SIZE, + grep: str = None, + grep_ignorecase: bool = True, + return_all: bool = False, + ctx: Context = None +) -> dict: + """Get function call graph with cursor-based pagination on edges Args: name: Starting function name (mutually exclusive with address) address: Starting function address (mutually exclusive with name) - max_depth: Maximum call depth to analyze (default: 3). Increase for deeper call chains (e.g., 10-15 for complex functions) + max_depth: Maximum call depth to analyze (default: 3) port: Specific Ghidra instance port (optional) + page_size: Edges per page (default: 50, max: 500) + grep: Regex pattern to filter edges (e.g., "malloc|free", "FUN_00") + grep_ignorecase: Case-insensitive grep (default: True) + return_all: Return all edges without pagination + ctx: FastMCP context (auto-injected) + Returns: - dict: Graph data with nodes and edges + dict: Call graph with paginated edges. Use cursor_next(cursor_id) for more. + + Examples: + # Get callgraph, filter for memory functions + analysis_get_callgraph(name="main", grep="alloc|free|memcpy") + + # Deep analysis with pagination + analysis_get_callgraph(name="main", max_depth=10, page_size=100) """ - port = _get_instance_port(port) - + port_to_use = _get_instance_port(port) + params = {"max_depth": max_depth} - + # Explicitly pass either name or address parameter based on what was provided if address: params["address"] = address + func_id = address elif name: params["name"] = name + func_id = name + else: + func_id = "entry_point" # If neither is provided, the Java endpoint will use the entry point - - response = safe_get(port, "analysis/callgraph", params) - return simplify_response(response) + + response = safe_get(port_to_use, "analysis/callgraph", params) + simplified = simplify_response(response) + + if not simplified.get("success", False): + return simplified + + # Extract graph data - typically has nodes and edges + result = simplified.get("result", {}) + edges = result.get("edges", []) if isinstance(result, dict) else [] + nodes = result.get("nodes", []) if isinstance(result, dict) else [] + + if not edges: + return simplified # Return as-is if no edges + + # Build query params for cursor hashing + query_params = { + "tool": "analysis_get_callgraph", + "port": port_to_use, + "name": name, + "address": address, + "max_depth": max_depth, + "grep": grep + } + + sid = _get_session_id(ctx) + + # Paginate edges (nodes are typically smaller, include all) + paginated = paginate_response( + data=edges, + query_params=query_params, + tool_name="analysis_get_callgraph", + session_id=sid, + page_size=min(page_size, MAX_PAGE_SIZE), + grep=grep, + grep_ignorecase=grep_ignorecase, + return_all=return_all + ) + + # Reconstruct result with paginated edges + if paginated.get("success"): + paginated["result"] = { + "root_function": func_id, + "max_depth": max_depth, + "nodes": nodes, # Include all nodes for context + "edges": paginated.get("result", []), + "total_nodes": len(nodes), + } + + return paginated @mcp.tool() -def analysis_get_dataflow(address: str, direction: str = "forward", max_steps: int = 50, port: int = None) -> dict: - """Perform data flow analysis from an address - +def analysis_get_dataflow( + address: str, + direction: str = "forward", + max_steps: int = 50, + port: int = None, + # Pagination parameters + page_size: int = DEFAULT_PAGE_SIZE, + grep: str = None, + grep_ignorecase: bool = True, + return_all: bool = False, + ctx: Context = None +) -> dict: + """Perform data flow analysis with cursor-based pagination on steps + Args: address: Starting address in hex format direction: "forward" or "backward" (default: "forward") max_steps: Maximum analysis steps (default: 50) port: Specific Ghidra instance port (optional) - + page_size: Steps per page (default: 50, max: 500) + grep: Regex pattern to filter steps (e.g., "MOV|LEA", "EAX|RAX") + grep_ignorecase: Case-insensitive grep (default: True) + return_all: Return all steps without pagination + ctx: FastMCP context (auto-injected) + + Returns: - dict: Data flow analysis results + dict: Data flow steps with pagination. Use cursor_next(cursor_id) for more. + + Examples: + # Track data flow, filter for memory operations + analysis_get_dataflow(address="0x401000", grep="MOV|PUSH|POP") + + # Backward flow to find data sources + analysis_get_dataflow(address="0x401000", direction="backward", grep="LEA|MOV") """ if not address: return { @@ -2242,17 +3995,66 @@ def analysis_get_dataflow(address: str, direction: str = "forward", max_steps: i }, "timestamp": int(time.time() * 1000) } - - port = _get_instance_port(port) - + + port_to_use = _get_instance_port(port) + params = { "address": address, "direction": direction, "max_steps": max_steps } - - response = safe_get(port, "analysis/dataflow", params) - return simplify_response(response) + + response = safe_get(port_to_use, "analysis/dataflow", params) + simplified = simplify_response(response) + + if not simplified.get("success", False): + return simplified + + # Extract dataflow steps + result = simplified.get("result", {}) + steps = result.get("steps", []) if isinstance(result, dict) else [] + + if not steps: + return simplified # Return as-is if no steps + + # Build query params for cursor hashing + query_params = { + "tool": "analysis_get_dataflow", + "port": port_to_use, + "address": address, + "direction": direction, + "max_steps": max_steps, + "grep": grep + } + + sid = _get_session_id(ctx) + + # Paginate steps + paginated = paginate_response( + data=steps, + query_params=query_params, + tool_name="analysis_get_dataflow", + session_id=sid, + page_size=min(page_size, MAX_PAGE_SIZE), + grep=grep, + grep_ignorecase=grep_ignorecase, + return_all=return_all + ) + + # Reconstruct result with paginated steps + if paginated.get("success"): + paginated["result"] = { + "start_address": address, + "direction": direction, + "steps": paginated.get("result", []), + } + # Preserve other metadata + if isinstance(result, dict): + for key in ["sources", "sinks", "total_steps"]: + if key in result: + paginated["result"][key] = result[key] + + return paginated @mcp.tool() def ui_get_current_address(port: int = None) -> dict: diff --git a/pyproject.toml b/pyproject.toml index 770d69b..871f90a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,12 +1,12 @@ [project] name = "ghydramcp" -version = "2.0.0" +version = "2025.12.1" description = "AI-assisted reverse engineering bridge: a multi-instance Ghidra plugin exposed via a HATEOAS REST API plus an MCP Python bridge for decompilation, analysis & binary manipulation" readme = "README.md" requires-python = ">=3.11" dependencies = [ - "mcp==1.6.0", - "requests==2.32.3", + "mcp>=1.22.0", + "requests>=2.32.3", ] [project.scripts]