mcghidra/bridge_mcp_hydra.py
Ryan Malloy 1143489924
Some checks are pending
Build Ghidra Plugin / build (push) Waiting to run
refactor: Rename project from ghydramcp to mcghidra
- Rename src/ghydramcp → src/mcghidra
- Rename GhydraMCPPlugin.java → MCGhidraPlugin.java
- Update all imports, class names, and references
- Update pyproject.toml package name and script entry
- Update Docker image names and container prefixes
- Update environment variables: GHYDRA_* → MCGHIDRA_*
- Update all documentation references
2026-02-07 02:13:53 -07:00

6874 lines
231 KiB
Python

# /// script
# requires-python = ">=3.11"
# dependencies = [
# "mcp>=1.22.0",
# "requests>=2.32.3",
# ]
# ///
# MCGhidra Bridge for Ghidra HATEOAS API - Optimized for MCP integration
# Provides namespaced tools for interacting with Ghidra's reverse engineering capabilities
# Features: Cursor-based pagination, grep filtering, session isolation
import os
import signal
import sys
import threading
import time
from threading import Lock
from typing import Dict, List, Optional, Union, Any
from urllib.parse import quote, urlencode, urlparse
import requests
from mcp.server.fastmcp import FastMCP, Context
# ================= Core Infrastructure =================
ALLOWED_ORIGINS = os.environ.get(
"GHIDRA_ALLOWED_ORIGINS", "http://localhost").split(",")
active_instances: Dict[int, dict] = {}
instances_lock = Lock()
DEFAULT_GHIDRA_PORT = 8192
DEFAULT_GHIDRA_HOST = "localhost"
QUICK_DISCOVERY_RANGE = range(DEFAULT_GHIDRA_PORT, DEFAULT_GHIDRA_PORT+10)
FULL_DISCOVERY_RANGE = range(DEFAULT_GHIDRA_PORT, DEFAULT_GHIDRA_PORT+20)
BRIDGE_VERSION = "2025-12-01"
REQUIRED_API_VERSION = 2010
current_instance_port = DEFAULT_GHIDRA_PORT
# ================= Cursor-Based Pagination System =================
# Provides efficient pagination with grep filtering for large responses
# Inspired by mcplaywright pagination system
import re
import hashlib
import json
from dataclasses import dataclass, field
from typing import Callable, Iterator
from collections import OrderedDict
# Configuration
CURSOR_TTL_SECONDS = 300 # 5 minutes
CURSOR_MAX_CACHE_SIZE = 100 # Maximum number of cached cursors
DEFAULT_PAGE_SIZE = 50
MAX_PAGE_SIZE = 500
TOKEN_ESTIMATION_RATIO = 4.0 # Roughly 4 chars per token
# ReDoS Protection Configuration
MAX_GREP_PATTERN_LENGTH = 500 # Maximum regex pattern length
MAX_GREP_REPETITION_OPS = 15 # Maximum repetition operators (* + ? {})
MAX_GREP_RECURSION_DEPTH = 10 # Maximum depth for nested data grep matching
# ================= Progress Reporting System =================
# Provides async progress updates for long-running operations
import asyncio
from contextlib import asynccontextmanager
class ProgressReporter:
"""Helper class for reporting progress during long operations.
Wraps FastMCP's context to provide convenient progress reporting with
automatic handling of sync vs async contexts.
Usage:
progress = ProgressReporter(ctx, "Loading functions", total=1000)
for i, item in enumerate(items):
await progress.update(i + 1)
await progress.complete("Loaded {count} functions")
"""
def __init__(self, ctx: Context, operation: str, total: int = 100):
"""Initialize progress reporter.
Args:
ctx: FastMCP Context (can be None for sync operations)
operation: Human-readable description of the operation
total: Total expected items/steps (default: 100 for percentage)
"""
self.ctx = ctx
self.operation = operation
self.total = total
self.current = 0
self._last_reported = 0
self._report_threshold = max(1, total // 20) # Report every 5% at minimum
async def update(self, progress: int = None, message: str = None):
"""Update progress, reporting to client if threshold reached.
Args:
progress: Current progress value (if None, increments by 1)
message: Optional status message to include
"""
if progress is not None:
self.current = progress
else:
self.current += 1
# Only report if we've passed the threshold (avoid spamming)
if self.ctx and (self.current - self._last_reported >= self._report_threshold
or self.current >= self.total):
try:
await self.ctx.report_progress(
progress=self.current,
total=self.total
)
if message:
await self.ctx.info(f"{self.operation}: {message}")
self._last_reported = self.current
except Exception:
pass # Ignore progress reporting errors
async def info(self, message: str):
"""Send an info message to the client."""
if self.ctx:
try:
await self.ctx.info(f"{self.operation}: {message}")
except Exception:
pass
async def complete(self, message: str = None):
"""Mark operation as complete."""
self.current = self.total
if self.ctx:
try:
await self.ctx.report_progress(progress=self.total, total=self.total)
if message:
await self.ctx.info(message.format(
count=self.current,
total=self.total,
operation=self.operation
))
except Exception:
pass
async def report_progress(ctx: Context, progress: int, total: int, message: str = None):
"""Convenience function for one-off progress updates.
Args:
ctx: FastMCP Context
progress: Current progress value
total: Total expected value
message: Optional status message
"""
if ctx:
try:
await ctx.report_progress(progress=progress, total=total)
if message:
await ctx.info(message)
except Exception:
pass
async def report_step(ctx: Context, step: int, total_steps: int, description: str):
"""Report a discrete step in a multi-step operation.
Args:
ctx: FastMCP Context
step: Current step number (1-indexed)
total_steps: Total number of steps
description: What this step is doing
"""
if ctx:
try:
await ctx.report_progress(progress=step, total=total_steps)
await ctx.info(f"Step {step}/{total_steps}: {description}")
except Exception:
pass
def compile_safe_pattern(pattern: str, flags: int = 0) -> re.Pattern:
"""Compile regex pattern with ReDoS protection
Validates pattern to prevent catastrophic backtracking attacks.
Rejects patterns that are too long or have excessive repetition operators.
Args:
pattern: Regex pattern string
flags: Regex compilation flags
Returns:
Compiled regex pattern
Raises:
ValueError: If pattern fails safety validation
"""
if not pattern:
raise ValueError("Empty pattern")
# Check pattern length
if len(pattern) > MAX_GREP_PATTERN_LENGTH:
raise ValueError(
f"Pattern too long ({len(pattern)} chars, max {MAX_GREP_PATTERN_LENGTH}). "
"Consider using a simpler pattern or substring match."
)
# Count repetition operators that could cause catastrophic backtracking
# These are the main culprits: nested quantifiers like (a+)+, (a*)*
repetition_ops = pattern.count('*') + pattern.count('+') + pattern.count('?')
# Also count bounded repetitions {n,m}
repetition_ops += len(re.findall(r'\{[0-9,]+\}', pattern))
if repetition_ops > MAX_GREP_REPETITION_OPS:
raise ValueError(
f"Pattern has too many repetition operators ({repetition_ops}, max {MAX_GREP_REPETITION_OPS}). "
"This could cause performance issues. Consider simplifying the pattern."
)
# Check for common dangerous patterns (nested quantifiers)
dangerous_patterns = [
r'\([^)]*[*+][^)]*\)[*+]', # (a+)+ or (a*)*
r'\([^)]*[*+][^)]*\)\{', # (a+){n,m}
]
for dangerous in dangerous_patterns:
if re.search(dangerous, pattern):
raise ValueError(
"Pattern contains nested quantifiers which could cause exponential backtracking. "
"Example: (a+)+ is dangerous. Consider using atomic groups or simplifying."
)
# Try to compile the pattern
try:
return re.compile(pattern, flags)
except re.error as e:
raise ValueError(f"Invalid regex pattern: {e}")
@dataclass
class CursorState:
"""Represents the state of a paginated query with session isolation"""
cursor_id: str # Unique cursor identifier
session_id: str # Session isolation key
tool_name: str # Tool that created this cursor
query_hash: str # Hash of original query parameters
data: List[Any] # Full result set (or filtered)
total_count: int # Total items before pagination
filtered_count: int # Items after grep filtering
current_offset: int = 0
page_size: int = DEFAULT_PAGE_SIZE
grep_pattern: str = None
grep_flags: int = 0
created_at: float = field(default_factory=time.time)
last_accessed: float = field(default_factory=time.time)
@property
def is_expired(self) -> bool:
return time.time() - self.last_accessed > CURSOR_TTL_SECONDS
@property
def has_more(self) -> bool:
return self.current_offset + self.page_size < self.filtered_count
@property
def current_page(self) -> int:
return (self.current_offset // self.page_size) + 1
@property
def total_pages(self) -> int:
return max(1, (self.filtered_count + self.page_size - 1) // self.page_size)
@property
def ttl_remaining(self) -> int:
return max(0, int(CURSOR_TTL_SECONDS - (time.time() - self.last_accessed)))
def verify_session(self, session_id: str) -> bool:
"""Verify cursor belongs to requesting session"""
return self.session_id == session_id
class CursorManager:
"""Thread-safe cursor manager with TTL-based expiration and session isolation"""
def __init__(self):
self._cursors: OrderedDict[str, CursorState] = OrderedDict()
self._session_cursors: Dict[str, set] = {} # session_id -> set of cursor_ids
self._lock = Lock()
def _generate_cursor_id(self, query_hash: str, session_id: str) -> str:
"""Generate a unique cursor ID"""
unique = f"{session_id}-{query_hash}-{time.time()}-{id(self)}"
return hashlib.sha256(unique.encode()).hexdigest()[:16]
def _cleanup_expired(self):
"""Remove expired cursors (call while holding lock)"""
expired = [cid for cid, state in self._cursors.items() if state.is_expired]
for cid in expired:
state = self._cursors[cid]
# Remove from session tracking
if state.session_id in self._session_cursors:
self._session_cursors[state.session_id].discard(cid)
del self._cursors[cid]
# Also enforce max cache size (LRU eviction)
while len(self._cursors) > CURSOR_MAX_CACHE_SIZE:
oldest_id, oldest_state = self._cursors.popitem(last=False)
if oldest_state.session_id in self._session_cursors:
self._session_cursors[oldest_state.session_id].discard(oldest_id)
def create_cursor(self, data: List[Any], query_params: dict,
tool_name: str = "unknown",
session_id: str = "default",
grep_pattern: str = None, grep_flags: int = 0,
page_size: int = DEFAULT_PAGE_SIZE) -> tuple[str, CursorState]:
"""Create a new cursor for paginated results
Args:
data: The full result set to paginate
query_params: Original query parameters (for hashing)
tool_name: Name of tool creating cursor
session_id: Session identifier for isolation
grep_pattern: Optional regex pattern to filter results
grep_flags: Regex flags (re.IGNORECASE, etc.)
page_size: Items per page
Returns:
Tuple of (cursor_id, cursor_state)
"""
# Apply grep filtering if pattern provided (with ReDoS protection)
filtered_data = data
if grep_pattern:
pattern = compile_safe_pattern(grep_pattern, grep_flags)
filtered_data = [
item for item in data
if self._matches_grep(item, pattern)
]
# Create query hash for deduplication
query_hash = hashlib.md5(
json.dumps(query_params, sort_keys=True, default=str).encode()
).hexdigest()[:12]
with self._lock:
self._cleanup_expired()
cursor_id = self._generate_cursor_id(query_hash, session_id)
state = CursorState(
cursor_id=cursor_id,
session_id=session_id,
tool_name=tool_name,
query_hash=query_hash,
data=filtered_data,
total_count=len(data),
filtered_count=len(filtered_data),
page_size=min(page_size, MAX_PAGE_SIZE),
grep_pattern=grep_pattern,
grep_flags=grep_flags
)
self._cursors[cursor_id] = state
# Track by session
if session_id not in self._session_cursors:
self._session_cursors[session_id] = set()
self._session_cursors[session_id].add(cursor_id)
return cursor_id, state
def get_cursor(self, cursor_id: str, session_id: str = None) -> Optional[CursorState]:
"""Retrieve a cursor by ID, optionally validating session
Args:
cursor_id: The cursor identifier
session_id: Optional session to validate against
Returns:
CursorState if found and valid, None otherwise
"""
with self._lock:
self._cleanup_expired()
if cursor_id not in self._cursors:
return None
state = self._cursors[cursor_id]
if state.is_expired:
del self._cursors[cursor_id]
if state.session_id in self._session_cursors:
self._session_cursors[state.session_id].discard(cursor_id)
return None
# Validate session if provided
if session_id and not state.verify_session(session_id):
return None
state.last_accessed = time.time()
# Move to end (most recently used)
self._cursors.move_to_end(cursor_id)
return state
def advance_cursor(self, cursor_id: str, session_id: str = None) -> Optional[CursorState]:
"""Advance cursor to next page
Args:
cursor_id: The cursor identifier
session_id: Optional session to validate against
Returns:
Updated CursorState or None if invalid/expired
"""
with self._lock:
state = self._cursors.get(cursor_id)
if not state or state.is_expired:
return None
if session_id and not state.verify_session(session_id):
return None
state.current_offset += state.page_size
state.last_accessed = time.time()
self._cursors.move_to_end(cursor_id)
return state
def delete_cursor(self, cursor_id: str, session_id: str = None) -> bool:
"""Explicitly delete a cursor
Args:
cursor_id: The cursor identifier
session_id: Optional session to validate against
Returns:
True if deleted, False if not found or session mismatch
"""
with self._lock:
if cursor_id not in self._cursors:
return False
state = self._cursors[cursor_id]
if session_id and not state.verify_session(session_id):
return False
if state.session_id in self._session_cursors:
self._session_cursors[state.session_id].discard(cursor_id)
del self._cursors[cursor_id]
return True
def delete_session_cursors(self, session_id: str) -> int:
"""Delete all cursors for a session
Args:
session_id: The session identifier
Returns:
Number of cursors deleted
"""
with self._lock:
if session_id not in self._session_cursors:
return 0
cursor_ids = list(self._session_cursors[session_id])
count = 0
for cid in cursor_ids:
if cid in self._cursors:
del self._cursors[cid]
count += 1
del self._session_cursors[session_id]
return count
def get_page(self, state: CursorState) -> List[Any]:
"""Get current page of data from cursor state"""
start = state.current_offset
end = start + state.page_size
return state.data[start:end]
def _matches_grep(self, item: Any, pattern: re.Pattern, depth: int = 0) -> bool:
"""Check if an item matches the grep pattern
Searches through string representations of dict values,
list items, or the item itself.
Args:
item: The item to search
pattern: Compiled regex pattern
depth: Current recursion depth (for stack overflow protection)
Returns:
True if pattern matches anywhere in the item
"""
# Prevent stack overflow from deeply nested structures
if depth > MAX_GREP_RECURSION_DEPTH:
return False
if isinstance(item, dict):
# Search all string values in the dict (recursively)
for key, value in item.items():
if isinstance(value, str) and pattern.search(value):
return True
elif isinstance(value, (int, float)):
if pattern.search(str(value)):
return True
elif isinstance(value, dict):
if self._matches_grep(value, pattern, depth + 1):
return True
elif isinstance(value, (list, tuple)):
if self._matches_grep(value, pattern, depth + 1):
return True
return False
elif isinstance(item, (list, tuple)):
return any(self._matches_grep(i, pattern, depth + 1) for i in item)
elif isinstance(item, str):
return bool(pattern.search(item))
else:
return bool(pattern.search(str(item)))
def list_cursors(self, session_id: str = None) -> List[dict]:
"""List active cursors, optionally filtered by session
Args:
session_id: Optional session filter
Returns:
List of cursor info dicts
"""
with self._lock:
self._cleanup_expired()
return [
{
"cursor_id": cid,
"session_id": state.session_id,
"tool_name": state.tool_name,
"total_count": state.total_count,
"filtered_count": state.filtered_count,
"current_page": state.current_page,
"total_pages": state.total_pages,
"current_offset": state.current_offset,
"page_size": state.page_size,
"has_more": state.has_more,
"grep_pattern": state.grep_pattern,
"age_seconds": int(time.time() - state.created_at),
"ttl_remaining": state.ttl_remaining
}
for cid, state in self._cursors.items()
if session_id is None or state.session_id == session_id
]
def get_stats(self) -> dict:
"""Get cursor manager statistics"""
with self._lock:
self._cleanup_expired()
return {
"total_cursors": len(self._cursors),
"total_sessions": len(self._session_cursors),
"max_cache_size": CURSOR_MAX_CACHE_SIZE,
"ttl_seconds": CURSOR_TTL_SECONDS,
"cursors_per_session": {
sid: len(cids) for sid, cids in self._session_cursors.items()
}
}
# Global cursor manager instance
cursor_manager = CursorManager()
def estimate_tokens(data: List[Any]) -> int:
"""Estimate token count for a list of items"""
text = json.dumps(data, default=str)
return int(len(text) / TOKEN_ESTIMATION_RATIO)
def paginate_response(data: List[Any], query_params: dict,
tool_name: str = "unknown",
session_id: str = "default",
page_size: int = DEFAULT_PAGE_SIZE,
grep: str = None, grep_ignorecase: bool = True,
return_all: bool = False) -> dict:
"""Create a paginated response with optional grep filtering
Args:
data: Full result list to paginate
query_params: Original query parameters (for cursor creation)
tool_name: Name of the tool creating this response
session_id: Session identifier for cursor isolation
page_size: Items per page (default: 50, max: 500)
grep: Optional regex pattern to filter results
grep_ignorecase: Case-insensitive grep (default: True)
return_all: Bypass pagination and return all results (with warning)
Returns:
dict with pagination metadata and results
"""
grep_flags = re.IGNORECASE if grep_ignorecase else 0
# Handle return_all bypass
if return_all:
# Apply grep filtering even for return_all
filtered_data = data
if grep:
try:
pattern = compile_safe_pattern(grep, grep_flags)
filtered_data = [
item for item in data
if cursor_manager._matches_grep(item, pattern)
]
except ValueError as e:
return {
"success": False,
"error": {
"code": "INVALID_GREP_PATTERN",
"message": str(e)
},
"timestamp": int(time.time() * 1000)
}
estimated_tokens = estimate_tokens(filtered_data)
warning = None
if estimated_tokens > 50000:
warning = f"🚨 EXTREMELY LARGE response (~{estimated_tokens:,} tokens) - may cause issues"
elif estimated_tokens > 20000:
warning = f"⚠️ VERY LARGE response (~{estimated_tokens:,} tokens) - consider using pagination"
elif estimated_tokens > 8000:
warning = f"⚠️ Large response (~{estimated_tokens:,} tokens)"
return {
"success": True,
"result": filtered_data,
"pagination": {
"bypassed": True,
"total_count": len(data),
"filtered_count": len(filtered_data),
"grep_pattern": grep,
"estimated_tokens": estimated_tokens,
"warning": warning
},
"timestamp": int(time.time() * 1000)
}
# Normal pagination flow
try:
cursor_id, state = cursor_manager.create_cursor(
data=data,
query_params=query_params,
tool_name=tool_name,
session_id=session_id,
grep_pattern=grep,
grep_flags=grep_flags,
page_size=page_size
)
except ValueError as e:
return {
"success": False,
"error": {
"code": "INVALID_GREP_PATTERN",
"message": str(e)
},
"timestamp": int(time.time() * 1000)
}
current_page = cursor_manager.get_page(state)
# Only include cursor_id if there are more pages
response_cursor = cursor_id if state.has_more else None
# Build response with prominent continuation message for LLMs
response = {
"success": True,
"result": current_page,
"pagination": {
"cursor_id": response_cursor,
"session_id": session_id,
"total_count": state.total_count,
"filtered_count": state.filtered_count,
"page_size": state.page_size,
"current_page": state.current_page,
"total_pages": state.total_pages,
"has_more": state.has_more,
"grep_pattern": grep,
"items_returned": len(current_page),
},
"timestamp": int(time.time() * 1000)
}
# Add prominent message for LLMs when more data is available
if state.has_more:
remaining = state.filtered_count - (state.current_page * state.page_size)
response["_message"] = (
f"📄 Showing {len(current_page)} of {state.filtered_count} items "
f"(page {state.current_page}/{state.total_pages}). "
f"To get the next {min(state.page_size, remaining)} items, call: "
f"cursor_next(cursor_id='{cursor_id}')"
)
else:
response["_message"] = f"✅ Complete: {len(current_page)} items returned (all results)"
return response
# ================= End Cursor System =================
instructions = """
MCGhidra allows interacting with multiple Ghidra SRE instances. Ghidra SRE is a tool for reverse engineering and analyzing binaries, e.g. malware.
First, run `instances_list()` to see all available Ghidra instances (automatically discovers instances on the default host).
Then use `instances_use(port)` to set your working instance.
Note: Use `instances_discover(host)` only if you need to scan a different host.
The API is organized into namespaces for different types of operations:
- instances_* : For managing Ghidra instances
- functions_* : For working with functions
- data_* : For working with data items
- structs_* : For creating and managing struct data types
- memory_* : For memory access
- xrefs_* : For cross-references
- analysis_* : For program analysis
- cursor_* : For pagination cursor management
## Pagination System
The following tools support cursor-based pagination with grep filtering:
- `functions_list` - List functions (can be 10K+)
- `functions_decompile` - Decompiled code lines (grep for patterns like "if.*NULL")
- `functions_disassemble` - Assembly instructions (grep for "CALL", "JMP", etc.)
- `functions_get_variables` - Function variables (grep for "local_", "param", etc.)
- `data_list` - List data items
- `data_list_strings` - List string data
- `xrefs_list` - List cross-references (can be very large for common functions)
- `structs_list` - List struct types
- `structs_get` - Struct fields (grep for field names/types in large structs)
- `analysis_get_callgraph` - Call graph edges (grep for function names)
- `analysis_get_dataflow` - Data flow steps (grep for opcodes/registers)
Pagination parameters:
- `page_size`: Items per page (default: 50, max: 500)
- `grep`: Regex pattern to filter results (e.g., "main|init", "FUN_00.*")
- `grep_ignorecase`: Case-insensitive grep (default: True)
- `return_all`: Bypass pagination and return all results (use with caution)
When results are paginated, the response includes a `_message` field with instructions.
Use `cursor_next(cursor_id)` to fetch the next page of results.
Use `cursor_list()` to see active cursors.
Use `cursor_delete(cursor_id)` to clean up cursors.
"""
mcp = FastMCP("MCGhidra", instructions=instructions)
ghidra_host = os.environ.get("GHIDRA_HYDRA_HOST", DEFAULT_GHIDRA_HOST)
# Helper function to get the current instance or validate a specific port
def _get_instance_port(port=None):
"""Internal helper to get the current instance port or validate a specific port"""
port = port or current_instance_port
# Validate that the instance exists and is active
if port not in active_instances:
# Try to register it if not found
register_instance(port)
if port not in active_instances:
raise ValueError(f"No active Ghidra instance on port {port}")
return port
# The rest of the utility functions (HTTP helpers, etc.) remain the same...
def get_instance_url(port: int) -> str:
"""Get URL for a Ghidra instance by port"""
with instances_lock:
if port in active_instances:
return active_instances[port]["url"]
if 8192 <= port <= 65535:
register_instance(port)
if port in active_instances:
return active_instances[port]["url"]
return f"http://{ghidra_host}:{port}"
def validate_origin(headers: dict) -> bool:
"""Validate request origin against allowed origins"""
origin = headers.get("Origin")
if not origin:
# No origin header - allow (browser same-origin policy applies)
return True
# Parse origin to get scheme+hostname
try:
parsed = urlparse(origin)
origin_base = f"{parsed.scheme}://{parsed.hostname}"
if parsed.port:
origin_base += f":{parsed.port}"
except:
return False
return origin_base in ALLOWED_ORIGINS
def _make_request(method: str, port: int, endpoint: str, params: dict = None,
json_data: dict = None, data: str = None,
headers: dict = None) -> dict:
"""Internal helper to make HTTP requests and handle common errors."""
url = f"{get_instance_url(port)}/{endpoint}"
# Set up headers according to HATEOAS API expected format
request_headers = {
'Accept': 'application/json',
'X-Request-ID': f"mcp-bridge-{int(time.time() * 1000)}"
}
if headers:
request_headers.update(headers)
is_state_changing = method.upper() in ["POST", "PUT", "PATCH", "DELETE"]
if is_state_changing:
check_headers = json_data.get("headers", {}) if isinstance(
json_data, dict) else (headers or {})
if not validate_origin(check_headers):
return {
"success": False,
"error": {
"code": "ORIGIN_NOT_ALLOWED",
"message": "Origin not allowed for state-changing request"
},
"status_code": 403,
"timestamp": int(time.time() * 1000)
}
if json_data is not None:
request_headers['Content-Type'] = 'application/json'
elif data is not None:
request_headers['Content-Type'] = 'text/plain'
try:
response = requests.request(
method,
url,
params=params,
json=json_data,
data=data,
headers=request_headers,
timeout=10
)
try:
parsed_json = response.json()
# Add timestamp if not present
if isinstance(parsed_json, dict) and "timestamp" not in parsed_json:
parsed_json["timestamp"] = int(time.time() * 1000)
# Check for HATEOAS compliant error response format and reformat if needed
if not response.ok and isinstance(parsed_json, dict) and "success" in parsed_json and not parsed_json["success"]:
# Check if error is in the expected HATEOAS format
if "error" in parsed_json and not isinstance(parsed_json["error"], dict):
# Convert string error to the proper format
error_message = parsed_json["error"]
parsed_json["error"] = {
"code": f"HTTP_{response.status_code}",
"message": error_message
}
return parsed_json
except ValueError:
if response.ok:
return {
"success": False,
"error": {
"code": "NON_JSON_RESPONSE",
"message": "Received non-JSON success response from Ghidra plugin"
},
"status_code": response.status_code,
"response_text": response.text[:500],
"timestamp": int(time.time() * 1000)
}
else:
return {
"success": False,
"error": {
"code": f"HTTP_{response.status_code}",
"message": f"Non-JSON error response: {response.text[:100]}..."
},
"status_code": response.status_code,
"response_text": response.text[:500],
"timestamp": int(time.time() * 1000)
}
except requests.exceptions.Timeout:
return {
"success": False,
"error": {
"code": "REQUEST_TIMEOUT",
"message": "Request timed out"
},
"status_code": 408,
"timestamp": int(time.time() * 1000)
}
except requests.exceptions.ConnectionError:
return {
"success": False,
"error": {
"code": "CONNECTION_ERROR",
"message": f"Failed to connect to Ghidra instance at {url}"
},
"status_code": 503,
"timestamp": int(time.time() * 1000)
}
except Exception as e:
return {
"success": False,
"error": {
"code": "UNEXPECTED_ERROR",
"message": f"An unexpected error occurred: {str(e)}"
},
"exception": e.__class__.__name__,
"timestamp": int(time.time() * 1000)
}
def safe_get(port: int, endpoint: str, params: dict = None) -> dict:
"""Make GET request to Ghidra instance"""
return _make_request("GET", port, endpoint, params=params)
def safe_put(port: int, endpoint: str, data: dict) -> dict:
"""Make PUT request to Ghidra instance with JSON payload"""
headers = data.pop("headers", None) if isinstance(data, dict) else None
return _make_request("PUT", port, endpoint, json_data=data, headers=headers)
def safe_post(port: int, endpoint: str, data: Union[dict, str]) -> dict:
"""Perform a POST request to a specific Ghidra instance with JSON or text payload"""
headers = None
json_payload = None
text_payload = None
if isinstance(data, dict):
headers = data.pop("headers", None)
json_payload = data
else:
text_payload = data
return _make_request("POST", port, endpoint, json_data=json_payload, data=text_payload, headers=headers)
def safe_patch(port: int, endpoint: str, data: dict) -> dict:
"""Perform a PATCH request to a specific Ghidra instance with JSON payload"""
headers = data.pop("headers", None) if isinstance(data, dict) else None
return _make_request("PATCH", port, endpoint, json_data=data, headers=headers)
def safe_delete(port: int, endpoint: str) -> dict:
"""Perform a DELETE request to a specific Ghidra instance"""
return _make_request("DELETE", port, endpoint)
def simplify_response(response: dict) -> dict:
"""
Simplify HATEOAS response data for easier AI agent consumption
- Removes _links from result entries
- Flattens nested structures when appropriate
- Preserves important metadata
- Converts structured data like disassembly to text for easier consumption
"""
if not isinstance(response, dict):
return response
# Make a copy to avoid modifying the original
result = response.copy()
# Store API response metadata
api_metadata = {}
for key in ["id", "instance", "timestamp", "size", "offset", "limit"]:
if key in result:
api_metadata[key] = result.get(key)
# Simplify the main result data if present
if "result" in result:
# Handle array results
if isinstance(result["result"], list):
simplified_items = []
for item in result["result"]:
if isinstance(item, dict):
# Store but remove HATEOAS links from individual items
item_copy = item.copy()
links = item_copy.pop("_links", None)
# Optionally store direct href links as more accessible properties
# This helps AI agents navigate the API without understanding HATEOAS
if isinstance(links, dict):
for link_name, link_data in links.items():
if isinstance(link_data, dict) and "href" in link_data:
item_copy[f"{link_name}_url"] = link_data["href"]
simplified_items.append(item_copy)
else:
simplified_items.append(item)
result["result"] = simplified_items
# Handle object results
elif isinstance(result["result"], dict):
result_copy = result["result"].copy()
# Store but remove links from result object
links = result_copy.pop("_links", None)
# Add direct href links for easier navigation
if isinstance(links, dict):
for link_name, link_data in links.items():
if isinstance(link_data, dict) and "href" in link_data:
result_copy[f"{link_name}_url"] = link_data["href"]
# Special case for disassembly - convert to text for easier consumption
if "instructions" in result_copy and isinstance(result_copy["instructions"], list):
disasm_text = ""
for instr in result_copy["instructions"]:
if isinstance(instr, dict):
addr = instr.get("address", "")
mnemonic = instr.get("mnemonic", "")
operands = instr.get("operands", "")
bytes_str = instr.get("bytes", "")
# Format: address: bytes mnemonic operands
disasm_text += f"{addr}: {bytes_str.ljust(10)} {mnemonic} {operands}\n"
# Add the text representation
result_copy["disassembly_text"] = disasm_text
# Remove the original structured instructions to simplify the response
result_copy.pop("instructions", None)
# Special case for decompiled code - make sure it's directly accessible
if "ccode" in result_copy:
result_copy["decompiled_text"] = result_copy["ccode"]
elif "decompiled" in result_copy:
result_copy["decompiled_text"] = result_copy["decompiled"]
result["result"] = result_copy
# Store but remove HATEOAS links from the top level
links = result.pop("_links", None)
# Add direct href links in a more accessible format
if isinstance(links, dict):
api_links = {}
for link_name, link_data in links.items():
if isinstance(link_data, dict) and "href" in link_data:
api_links[link_name] = link_data["href"]
# Add simplified links
if api_links:
result["api_links"] = api_links
# Restore API metadata
for key, value in api_metadata.items():
if key not in result:
result[key] = value
return result
def register_instance(port: int, url: str = None) -> str:
"""Register a new Ghidra instance
Args:
port: Port number of the Ghidra instance
url: Optional URL if different from default http://host:port
Returns:
str: Confirmation message or error
"""
if url is None:
url = f"http://{ghidra_host}:{port}"
try:
# Check for HATEOAS API by checking plugin-version endpoint
test_url = f"{url}/plugin-version"
response = requests.get(test_url, timeout=2)
if not response.ok:
return f"Error: Instance at {url} is not responding properly to HATEOAS API"
project_info = {"url": url}
try:
# Check plugin version to ensure compatibility
try:
version_data = response.json()
if "result" in version_data:
result = version_data["result"]
if isinstance(result, dict):
plugin_version = result.get("plugin_version", "")
api_version = result.get("api_version", 0)
project_info["plugin_version"] = plugin_version
project_info["api_version"] = api_version
# Verify API version compatibility
if api_version != REQUIRED_API_VERSION:
error_msg = f"API version mismatch: Plugin reports version {api_version}, but bridge requires version {REQUIRED_API_VERSION}"
print(error_msg, file=sys.stderr)
return error_msg
print(f"Connected to Ghidra plugin version {plugin_version} with API version {api_version}")
except Exception as e:
print(f"Error parsing plugin version: {e}", file=sys.stderr)
# Get program info from HATEOAS API
info_url = f"{url}/program"
try:
info_response = requests.get(info_url, timeout=2)
if info_response.ok:
try:
info_data = info_response.json()
if "result" in info_data:
result = info_data["result"]
if isinstance(result, dict):
# Extract project and file from programId (format: "project:/file")
program_id = result.get("programId", "")
if ":" in program_id:
project_name, file_path = program_id.split(":", 1)
project_info["project"] = project_name
# Remove leading slash from file path if present
if file_path.startswith("/"):
file_path = file_path[1:]
project_info["path"] = file_path
# Get file name directly from the result
project_info["file"] = result.get("name", "")
# Get other metadata
project_info["language_id"] = result.get("languageId", "")
project_info["compiler_spec_id"] = result.get("compilerSpecId", "")
project_info["image_base"] = result.get("image_base", "")
# Store _links from result for HATEOAS navigation
if "_links" in result:
project_info["_links"] = result.get("_links", {})
except Exception as e:
print(f"Error parsing info endpoint: {e}", file=sys.stderr)
except Exception as e:
print(f"Error connecting to info endpoint: {e}", file=sys.stderr)
except Exception:
# Non-critical, continue with registration even if project info fails
pass
with instances_lock:
active_instances[port] = project_info
return f"Registered instance on port {port} at {url}"
except Exception as e:
return f"Error: Could not connect to instance at {url}: {str(e)}"
def _discover_instances(port_range, host=None, timeout=0.5) -> dict:
"""Internal function to discover NEW Ghidra instances by scanning ports
This function only returns newly discovered instances that weren't already
in the active_instances registry. Use instances_discover() for a complete
list including already known instances.
"""
found_instances = []
scan_host = host if host is not None else ghidra_host
for port in port_range:
if port in active_instances:
continue # Skip already known instances
url = f"http://{scan_host}:{port}"
try:
# Try HATEOAS API via plugin-version endpoint
test_url = f"{url}/plugin-version"
response = requests.get(test_url,
headers={'Accept': 'application/json',
'X-Request-ID': f"discovery-{int(time.time() * 1000)}"},
timeout=timeout)
if response.ok:
# Further validate it's a MCGhidra instance by checking response format
try:
json_data = response.json()
if "success" in json_data and json_data["success"] and "result" in json_data:
# Looks like a valid HATEOAS API response
# Instead of relying only on register_instance, which already checks program info,
# extract additional information here for more detailed discovery results
result = register_instance(port, url)
# Initialize report info
instance_info = {
"port": port,
"url": url
}
# Extract version info for reporting
if isinstance(json_data["result"], dict):
instance_info["plugin_version"] = json_data["result"].get("plugin_version", "unknown")
instance_info["api_version"] = json_data["result"].get("api_version", "unknown")
else:
instance_info["plugin_version"] = "unknown"
instance_info["api_version"] = "unknown"
# Include project details from registered instance in the report
if port in active_instances:
instance_info["project"] = active_instances[port].get("project", "")
instance_info["file"] = active_instances[port].get("file", "")
instance_info["result"] = result
found_instances.append(instance_info)
except (ValueError, KeyError):
# Not a valid JSON response or missing expected keys
print(f"Port {port} returned non-HATEOAS response", file=sys.stderr)
continue
except requests.exceptions.RequestException:
# Instance not available, just continue
continue
return {
"found": len(found_instances),
"instances": found_instances
}
def periodic_discovery():
"""Periodically discover new instances"""
while True:
try:
_discover_instances(FULL_DISCOVERY_RANGE, timeout=0.5)
with instances_lock:
ports_to_remove = []
for port, info in active_instances.items():
url = info["url"]
try:
# Check HATEOAS API via plugin-version endpoint
response = requests.get(f"{url}/plugin-version", timeout=1)
if not response.ok:
ports_to_remove.append(port)
continue
# Update program info if available (especially to get project name)
try:
info_url = f"{url}/program"
info_response = requests.get(info_url, timeout=1)
if info_response.ok:
try:
info_data = info_response.json()
if "result" in info_data:
result = info_data["result"]
if isinstance(result, dict):
# Extract project and file from programId (format: "project:/file")
program_id = result.get("programId", "")
if ":" in program_id:
project_name, file_path = program_id.split(":", 1)
info["project"] = project_name
# Remove leading slash from file path if present
if file_path.startswith("/"):
file_path = file_path[1:]
info["path"] = file_path
# Get file name directly from the result
info["file"] = result.get("name", "")
# Get other metadata
info["language_id"] = result.get("languageId", "")
info["compiler_spec_id"] = result.get("compilerSpecId", "")
info["image_base"] = result.get("image_base", "")
except Exception as e:
print(f"Error parsing info endpoint during discovery: {e}", file=sys.stderr)
except Exception:
# Non-critical, continue even if update fails
pass
except requests.exceptions.RequestException:
ports_to_remove.append(port)
for port in ports_to_remove:
del active_instances[port]
print(f"Removed unreachable instance on port {port}")
except Exception as e:
print(f"Error in periodic discovery: {e}")
time.sleep(30)
def handle_sigint(signum, frame):
os._exit(0)
# ================= MCP Resources =================
# Resources provide information that can be loaded directly into context
# They focus on data and minimize metadata
@mcp.resource(uri="ghidra://instance/{port}")
def ghidra_instance(port: int = None) -> dict:
"""Get detailed information about a Ghidra instance and the loaded program
Args:
port: Specific Ghidra instance port (optional, uses current if omitted)
Returns:
dict: Detailed information about the Ghidra instance and loaded program
"""
port = _get_instance_port(port)
response = safe_get(port, "program")
if not isinstance(response, dict) or not response.get("success", False):
return {"error": f"Unable to access Ghidra instance on port {port}"}
# Extract only the most relevant information for the resource
result = response.get("result", {})
if not isinstance(result, dict):
return {
"success": False,
"error": {
"code": "INVALID_RESPONSE",
"message": "Invalid response format from Ghidra instance"
},
"timestamp": int(time.time() * 1000)
}
instance_info = {
"port": port,
"url": get_instance_url(port),
"program_name": result.get("name", "unknown"),
"program_id": result.get("programId", "unknown"),
"language": result.get("languageId", "unknown"),
"compiler": result.get("compilerSpecId", "unknown"),
"base_address": result.get("imageBase", "0x0"),
"memory_size": result.get("memorySize", 0),
"analysis_complete": result.get("analysisComplete", False)
}
# Add project information if available
if "project" in active_instances[port]:
instance_info["project"] = active_instances[port]["project"]
return instance_info
@mcp.resource(uri="ghidra://instance/{port}/function/decompile/address/{address}")
def decompiled_function_by_address(port: int = None, address: str = None) -> str:
"""Get decompiled C code for a function by address
Args:
port: Specific Ghidra instance port
address: Function address in hex format
Returns:
str: The decompiled C code as a string, or error message
"""
if not address:
return "Error: Address parameter is required"
port = _get_instance_port(port)
params = {
"syntax_tree": "false",
"style": "normalize"
}
endpoint = f"functions/{address}/decompile"
response = safe_get(port, endpoint, params)
simplified = simplify_response(response)
# For a resource, we want to directly return just the decompiled code
if (not isinstance(simplified, dict) or
not simplified.get("success", False) or
"result" not in simplified):
error_message = "Error: Could not decompile function"
if isinstance(simplified, dict) and "error" in simplified:
if isinstance(simplified["error"], dict):
error_message = simplified["error"].get("message", error_message)
else:
error_message = str(simplified["error"])
return error_message
# Extract just the decompiled code text
result = simplified["result"]
# Different endpoints may return the code in different fields, try all of them
if isinstance(result, dict):
for key in ["decompiled_text", "ccode", "decompiled"]:
if key in result:
return result[key]
return "Error: Could not extract decompiled code from response"
@mcp.resource(uri="ghidra://instance/{port}/function/decompile/name/{name}")
def decompiled_function_by_name(port: int = None, name: str = None) -> str:
"""Get decompiled C code for a function by name
Args:
port: Specific Ghidra instance port
name: Function name
Returns:
str: The decompiled C code as a string, or error message
"""
if not name:
return "Error: Name parameter is required"
port = _get_instance_port(port)
params = {
"syntax_tree": "false",
"style": "normalize"
}
endpoint = f"functions/by-name/{quote(name)}/decompile"
response = safe_get(port, endpoint, params)
simplified = simplify_response(response)
# For a resource, we want to directly return just the decompiled code
if (not isinstance(simplified, dict) or
not simplified.get("success", False) or
"result" not in simplified):
error_message = "Error: Could not decompile function"
if isinstance(simplified, dict) and "error" in simplified:
if isinstance(simplified["error"], dict):
error_message = simplified["error"].get("message", error_message)
else:
error_message = str(simplified["error"])
return error_message
# Extract just the decompiled code text
result = simplified["result"]
# Different endpoints may return the code in different fields, try all of them
if isinstance(result, dict):
for key in ["decompiled_text", "ccode", "decompiled"]:
if key in result:
return result[key]
return "Error: Could not extract decompiled code from response"
@mcp.resource(uri="ghidra://instance/{port}/function/info/address/{address}")
def function_info_by_address(port: int = None, address: str = None) -> dict:
"""Get detailed information about a function by address
Args:
port: Specific Ghidra instance port
address: Function address in hex format
Returns:
dict: Complete function information including signature, parameters, etc.
"""
if not address:
return {
"success": False,
"error": {
"code": "MISSING_PARAMETER",
"message": "Address parameter is required"
},
"timestamp": int(time.time() * 1000)
}
port = _get_instance_port(port)
endpoint = f"functions/{address}"
response = safe_get(port, endpoint)
simplified = simplify_response(response)
if (not isinstance(simplified, dict) or
not simplified.get("success", False) or
"result" not in simplified):
return {
"success": False,
"error": {
"code": "FUNCTION_NOT_FOUND",
"message": "Could not get function information",
"details": simplified.get("error") if isinstance(simplified, dict) else None
},
"timestamp": int(time.time() * 1000)
}
# Return just the function data without API metadata
return simplified["result"]
@mcp.resource(uri="ghidra://instance/{port}/function/info/name/{name}")
def function_info_by_name(port: int = None, name: str = None) -> dict:
"""Get detailed information about a function by name
Args:
port: Specific Ghidra instance port
name: Function name
Returns:
dict: Complete function information including signature, parameters, etc.
"""
if not name:
return {
"success": False,
"error": {
"code": "MISSING_PARAMETER",
"message": "Name parameter is required"
},
"timestamp": int(time.time() * 1000)
}
port = _get_instance_port(port)
endpoint = f"functions/by-name/{quote(name)}"
response = safe_get(port, endpoint)
simplified = simplify_response(response)
if (not isinstance(simplified, dict) or
not simplified.get("success", False) or
"result" not in simplified):
return {
"success": False,
"error": {
"code": "FUNCTION_NOT_FOUND",
"message": "Could not get function information",
"details": simplified.get("error") if isinstance(simplified, dict) else None
},
"timestamp": int(time.time() * 1000)
}
# Return just the function data without API metadata
return simplified["result"]
@mcp.resource(uri="ghidra://instance/{port}/function/disassembly/address/{address}")
def disassembly_by_address(port: int = None, address: str = None) -> str:
"""Get disassembled instructions for a function by address
Args:
port: Specific Ghidra instance port
address: Function address in hex format
Returns:
str: Formatted disassembly listing as a string
"""
if not address:
return "Error: Address parameter is required"
port = _get_instance_port(port)
endpoint = f"functions/{address}/disassembly"
response = safe_get(port, endpoint)
simplified = simplify_response(response)
if (not isinstance(simplified, dict) or
not simplified.get("success", False) or
"result" not in simplified):
error_message = "Error: Could not get disassembly"
if isinstance(simplified, dict) and "error" in simplified:
if isinstance(simplified["error"], dict):
error_message = simplified["error"].get("message", error_message)
else:
error_message = str(simplified["error"])
return error_message
# For a resource, we want to directly return just the disassembly text
result = simplified["result"]
# Check if we have a disassembly_text field already
if isinstance(result, dict) and "disassembly_text" in result:
return result["disassembly_text"]
# Otherwise if we have raw instructions, format them ourselves
if isinstance(result, dict) and "instructions" in result and isinstance(result["instructions"], list):
disasm_text = ""
for instr in result["instructions"]:
if isinstance(instr, dict):
addr = instr.get("address", "")
mnemonic = instr.get("mnemonic", "")
operands = instr.get("operands", "")
bytes_str = instr.get("bytes", "")
# Format: address: bytes mnemonic operands
disasm_text += f"{addr}: {bytes_str.ljust(10)} {mnemonic} {operands}\n"
return disasm_text
# If we have a direct disassembly field, try that as well
if isinstance(result, dict) and "disassembly" in result:
return result["disassembly"]
return "Error: Could not extract disassembly from response"
@mcp.resource(uri="ghidra://instance/{port}/function/disassembly/name/{name}")
def disassembly_by_name(port: int = None, name: str = None) -> str:
"""Get disassembled instructions for a function by name
Args:
port: Specific Ghidra instance port
name: Function name
Returns:
str: Formatted disassembly listing as a string
"""
if not name:
return "Error: Name parameter is required"
port = _get_instance_port(port)
endpoint = f"functions/by-name/{quote(name)}/disassembly"
response = safe_get(port, endpoint)
simplified = simplify_response(response)
if (not isinstance(simplified, dict) or
not simplified.get("success", False) or
"result" not in simplified):
error_message = "Error: Could not get disassembly"
if isinstance(simplified, dict) and "error" in simplified:
if isinstance(simplified["error"], dict):
error_message = simplified["error"].get("message", error_message)
else:
error_message = str(simplified["error"])
return error_message
# For a resource, we want to directly return just the disassembly text
result = simplified["result"]
# Check if we have a disassembly_text field already
if isinstance(result, dict) and "disassembly_text" in result:
return result["disassembly_text"]
# Otherwise if we have raw instructions, format them ourselves
if isinstance(result, dict) and "instructions" in result and isinstance(result["instructions"], list):
disasm_text = ""
for instr in result["instructions"]:
if isinstance(instr, dict):
addr = instr.get("address", "")
mnemonic = instr.get("mnemonic", "")
operands = instr.get("operands", "")
bytes_str = instr.get("bytes", "")
# Format: address: bytes mnemonic operands
disasm_text += f"{addr}: {bytes_str.ljust(10)} {mnemonic} {operands}\n"
return disasm_text
# If we have a direct disassembly field, try that as well
if isinstance(result, dict) and "disassembly" in result:
return result["disassembly"]
return "Error: Could not extract disassembly from response"
# ================= Enumeration Resources =================
# Lightweight read-only resources for listing/enumerating Ghidra data
# More efficient than tool calls for simple data access
@mcp.resource(uri="ghidra://instances")
def resource_instances_list() -> dict:
"""List all active Ghidra instances
Returns a lightweight summary of available instances for quick enumeration.
Use the /instance/{port} resource for detailed program info.
Returns:
dict: List of instances with port, project, and file info
"""
# Auto-discover instances before listing
_discover_instances(QUICK_DISCOVERY_RANGE, host=None, timeout=0.5)
with instances_lock:
instances = [
{
"port": port,
"project": info.get("project", ""),
"file": info.get("file", ""),
"url": info.get("url", f"http://{ghidra_host}:{port}")
}
for port, info in active_instances.items()
]
return {
"instances": instances,
"count": len(instances),
"current_port": current_instance_port,
"_hint": "Use /instance/{port} for detailed program info"
}
@mcp.resource(uri="ghidra://instance/{port}/functions")
def resource_functions_list(port: int = None) -> dict:
"""List all functions in the program (lightweight enumeration)
Returns function names and addresses for quick reference.
This is a read-only resource - use functions_list tool for filtering/pagination.
Args:
port: Ghidra instance port
Returns:
dict: List of functions with name, address, and size
"""
port = _get_instance_port(port)
# Fetch functions from Ghidra (limited for resource efficiency)
params = {"limit": 1000} # Cap at 1000 for resource response
response = safe_get(port, "functions", params)
simplified = simplify_response(response)
if not simplified.get("success", True):
return simplified
functions = simplified.get("result", simplified.get("functions", []))
if isinstance(functions, dict):
functions = functions.get("functions", [])
# Extract just the essential fields
func_list = []
for f in functions[:1000]: # Hard cap
if isinstance(f, dict):
func_list.append({
"name": f.get("name", "unknown"),
"address": f.get("entryPoint", f.get("address", "")),
"size": f.get("size", 0)
})
return {
"functions": func_list,
"count": len(func_list),
"truncated": len(functions) > 1000,
"_hint": "Use functions_list tool for filtering and pagination of large lists"
}
@mcp.resource(uri="ghidra://instance/{port}/strings")
def resource_strings_list(port: int = None) -> dict:
"""List defined strings in the program (lightweight enumeration)
Returns string values and addresses for quick reference.
Use data_list_strings tool for filtering/pagination.
Args:
port: Ghidra instance port
Returns:
dict: List of strings with address and value
"""
port = _get_instance_port(port)
params = {"limit": 500} # Strings can be verbose, cap lower
response = safe_get(port, "strings", params)
simplified = simplify_response(response)
if not simplified.get("success", True):
return simplified
strings = simplified.get("result", simplified.get("strings", []))
if isinstance(strings, dict):
strings = strings.get("strings", [])
# Extract essential fields
string_list = []
for s in strings[:500]:
if isinstance(s, dict):
string_list.append({
"address": s.get("address", ""),
"value": s.get("value", s.get("string", ""))[:200], # Truncate long strings
"length": s.get("length", len(s.get("value", "")))
})
return {
"strings": string_list,
"count": len(string_list),
"truncated": len(strings) > 500,
"_hint": "Use data_list_strings tool for full strings and pagination"
}
@mcp.resource(uri="ghidra://instance/{port}/data")
def resource_data_list(port: int = None) -> dict:
"""List defined data items in the program (lightweight enumeration)
Returns data labels, addresses, and types for quick reference.
Use data_list tool for filtering/pagination.
Args:
port: Ghidra instance port
Returns:
dict: List of data items with address, name, and type
"""
port = _get_instance_port(port)
params = {"limit": 1000}
response = safe_get(port, "data", params)
simplified = simplify_response(response)
if not simplified.get("success", True):
return simplified
data_items = simplified.get("result", simplified.get("data", []))
if isinstance(data_items, dict):
data_items = data_items.get("data", [])
# Extract essential fields
data_list = []
for d in data_items[:1000]:
if isinstance(d, dict):
data_list.append({
"address": d.get("address", ""),
"name": d.get("name", d.get("label", "")),
"type": d.get("type", d.get("dataType", ""))
})
return {
"data": data_list,
"count": len(data_list),
"truncated": len(data_items) > 1000,
"_hint": "Use data_list tool for filtering and pagination"
}
@mcp.resource(uri="ghidra://instance/{port}/structs")
def resource_structs_list(port: int = None) -> dict:
"""List defined struct types in the program (lightweight enumeration)
Returns struct names, sizes, and categories for quick reference.
Use structs_list tool for filtering/pagination, structs_get for fields.
Args:
port: Ghidra instance port
Returns:
dict: List of structs with name, size, and category
"""
port = _get_instance_port(port)
params = {"limit": 500}
response = safe_get(port, "structs", params)
simplified = simplify_response(response)
if not simplified.get("success", True):
return simplified
structs = simplified.get("result", simplified.get("structs", []))
if isinstance(structs, dict):
structs = structs.get("structs", [])
# Extract essential fields
struct_list = []
for s in structs[:500]:
if isinstance(s, dict):
struct_list.append({
"name": s.get("name", ""),
"size": s.get("size", s.get("length", 0)),
"category": s.get("category", s.get("categoryPath", ""))
})
return {
"structs": struct_list,
"count": len(struct_list),
"truncated": len(structs) > 500,
"_hint": "Use structs_list tool for pagination, structs_get for field details"
}
@mcp.resource(uri="ghidra://instance/{port}/xrefs/to/{address}")
def resource_xrefs_to(port: int = None, address: str = None) -> dict:
"""List cross-references TO an address (lightweight enumeration)
Returns references pointing to the specified address.
Use xrefs_list tool for full filtering/pagination.
Args:
port: Ghidra instance port
address: Target address in hex format
Returns:
dict: List of references to this address
"""
if not address:
return {"error": "Address parameter required"}
port = _get_instance_port(port)
params = {"toAddress": address, "limit": 200}
response = safe_get(port, "xrefs", params)
simplified = simplify_response(response)
if not simplified.get("success", True):
return simplified
xrefs = simplified.get("result", simplified.get("xrefs", []))
if isinstance(xrefs, dict):
xrefs = xrefs.get("xrefs", [])
# Extract essential fields
xref_list = []
for x in xrefs[:200]:
if isinstance(x, dict):
xref_list.append({
"from": x.get("fromAddress", x.get("from", "")),
"type": x.get("refType", x.get("type", "")),
"context": x.get("context", "")[:100] if x.get("context") else ""
})
return {
"to_address": address,
"references": xref_list,
"count": len(xref_list),
"truncated": len(xrefs) > 200,
"_hint": "Use xrefs_list tool for full filtering and pagination"
}
@mcp.resource(uri="ghidra://instance/{port}/xrefs/from/{address}")
def resource_xrefs_from(port: int = None, address: str = None) -> dict:
"""List cross-references FROM an address (lightweight enumeration)
Returns references originating from the specified address.
Use xrefs_list tool for full filtering/pagination.
Args:
port: Ghidra instance port
address: Source address in hex format
Returns:
dict: List of references from this address
"""
if not address:
return {"error": "Address parameter required"}
port = _get_instance_port(port)
params = {"fromAddress": address, "limit": 200}
response = safe_get(port, "xrefs", params)
simplified = simplify_response(response)
if not simplified.get("success", True):
return simplified
xrefs = simplified.get("result", simplified.get("xrefs", []))
if isinstance(xrefs, dict):
xrefs = xrefs.get("xrefs", [])
# Extract essential fields
xref_list = []
for x in xrefs[:200]:
if isinstance(x, dict):
xref_list.append({
"to": x.get("toAddress", x.get("to", "")),
"type": x.get("refType", x.get("type", "")),
"context": x.get("context", "")[:100] if x.get("context") else ""
})
return {
"from_address": address,
"references": xref_list,
"count": len(xref_list),
"truncated": len(xrefs) > 200,
"_hint": "Use xrefs_list tool for full filtering and pagination"
}
@mcp.resource(uri="ghidra://instance/{port}/summary")
def resource_program_summary(port: int = None) -> dict:
"""Get a comprehensive summary of the loaded program
Combines instance info with counts of functions, strings, data, etc.
Useful for getting a quick overview before detailed analysis.
Args:
port: Ghidra instance port
Returns:
dict: Program summary with statistics
"""
port = _get_instance_port(port)
# Get basic program info
program_info = ghidra_instance(port=port)
if "error" in program_info:
return program_info
# Get counts (lightweight queries)
summary = {
"program": program_info,
"statistics": {}
}
# Function count
try:
fn_response = safe_get(port, "functions", {"limit": 1})
if isinstance(fn_response, dict):
total = fn_response.get("result", {}).get("total", 0)
if not total:
total = fn_response.get("total", 0)
summary["statistics"]["functions"] = total
except Exception:
summary["statistics"]["functions"] = "unknown"
# String count
try:
str_response = safe_get(port, "strings", {"limit": 1})
if isinstance(str_response, dict):
total = str_response.get("result", {}).get("total", 0)
if not total:
total = str_response.get("total", 0)
summary["statistics"]["strings"] = total
except Exception:
summary["statistics"]["strings"] = "unknown"
# Data count
try:
data_response = safe_get(port, "data", {"limit": 1})
if isinstance(data_response, dict):
total = data_response.get("result", {}).get("total", 0)
if not total:
total = data_response.get("total", 0)
summary["statistics"]["data_items"] = total
except Exception:
summary["statistics"]["data_items"] = "unknown"
summary["_hint"] = "Use /instance/{port}/functions, /strings, /data for listings"
return summary
# ================= MCP Prompts =================
# Prompts define reusable templates for LLM interactions
@mcp.prompt("analyze_function")
def analyze_function_prompt(name: str = None, address: str = None, port: int = None):
"""A prompt to guide the LLM through analyzing a function
Args:
name: Function name (mutually exclusive with address)
address: Function address in hex format (mutually exclusive with address)
port: Specific Ghidra instance port (optional)
"""
port = _get_instance_port(port)
# Get function name if only address is provided
if address and not name:
fn_info = function_info_by_address(address=address, port=port)
if isinstance(fn_info, dict) and "name" in fn_info:
name = fn_info["name"]
# Create the template that guides analysis
decompiled = ""
disasm = ""
fn_info = None
if address:
decompiled = decompiled_function_by_address(address=address, port=port)
disasm = disassembly_by_address(address=address, port=port)
fn_info = function_info_by_address(address=address, port=port)
elif name:
decompiled = decompiled_function_by_name(name=name, port=port)
disasm = disassembly_by_name(name=name, port=port)
fn_info = function_info_by_name(name=name, port=port)
return {
"prompt": f"""
Analyze the following function: {name or address}
Decompiled code:
```c
{decompiled}
```
Disassembly:
```
{disasm}
```
1. What is the purpose of this function?
2. What are the key parameters and their uses?
3. What are the return values and their meanings?
4. Are there any security concerns in this implementation?
5. Describe the algorithm or process being implemented.
""",
"context": {
"function_info": fn_info
}
}
@mcp.prompt("identify_vulnerabilities")
def identify_vulnerabilities_prompt(name: str = None, address: str = None, port: int = None):
"""A prompt to help identify potential vulnerabilities in a function
Args:
name: Function name (mutually exclusive with address)
address: Function address in hex format (mutually exclusive with address)
port: Specific Ghidra instance port (optional)
"""
port = _get_instance_port(port)
# Get function name if only address is provided
if address and not name:
fn_info = function_info_by_address(address=address, port=port)
if isinstance(fn_info, dict) and "name" in fn_info:
name = fn_info["name"]
# Create the template focused on security analysis
decompiled = ""
disasm = ""
fn_info = None
if address:
decompiled = decompiled_function_by_address(address=address, port=port)
disasm = disassembly_by_address(address=address, port=port)
fn_info = function_info_by_address(address=address, port=port)
elif name:
decompiled = decompiled_function_by_name(name=name, port=port)
disasm = disassembly_by_name(name=name, port=port)
fn_info = function_info_by_name(name=name, port=port)
return {
"prompt": f"""
Analyze the following function for security vulnerabilities: {name or address}
Decompiled code:
```c
{decompiled}
```
Look for these vulnerability types:
1. Buffer overflows or underflows
2. Integer overflow/underflow
3. Use-after-free or double-free bugs
4. Format string vulnerabilities
5. Missing bounds checks
6. Insecure memory operations
7. Race conditions or timing issues
8. Input validation problems
For each potential vulnerability:
- Describe the vulnerability and where it occurs
- Explain the security impact
- Suggest how it could be exploited
- Recommend a fix
""",
"context": {
"function_info": fn_info,
"disassembly": disasm
}
}
@mcp.prompt("reverse_engineer_binary")
def reverse_engineer_binary_prompt(port: int = None):
"""A comprehensive prompt to guide the process of reverse engineering an entire binary
Args:
port: Specific Ghidra instance port (optional)
"""
port = _get_instance_port(port)
# Get program info for context
program_info = ghidra_instance(port=port)
# Create a comprehensive reverse engineering guide
return {
"prompt": f"""
# Comprehensive Binary Reverse Engineering Plan
Begin reverse engineering the binary {program_info.get('program_name', 'unknown')} using a methodical approach.
## Phase 1: Initial Reconnaissance
1. Analyze entry points and the main function
2. Identify and catalog key functions and libraries
3. Map the overall program structure
4. Identify important data structures
## Phase 2: Functional Analysis
1. Start with main() or entry point functions and trace the control flow
2. Find and rename all unnamed functions (FUN_*) called from main
3. For each function:
- Decompile and analyze its purpose
- Rename with descriptive names following consistent patterns
- Add comments for complex logic
- Identify parameters and return values
4. Follow cross-references (xrefs) to understand context of function usage
5. Pay special attention to:
- File I/O operations
- Network communication
- Memory allocation/deallocation
- Authentication/encryption routines
- Data processing algorithms
## Phase 3: Data Flow Mapping
1. Identify key data structures and rename them meaningfully
2. Track global variables and their usage across functions
3. Map data transformations through the program
4. Identify sensitive data handling (keys, credentials, etc.)
## Phase 4: Deep Analysis
1. For complex functions, perform deeper analysis using:
- Data flow analysis
- Call graph analysis
- Security vulnerability scanning
2. Look for interesting patterns:
- Command processing routines
- State machines
- Protocol implementations
- Cryptographic operations
## Implementation Strategy
1. Start with functions called from main
2. Search for unnamed functions with pattern "FUN_*"
3. Decompile each function and analyze its purpose
4. Look at its call graph and cross-references to understand context
5. Rename the function based on its behavior
6. Document key insights
7. Continue iteratively until the entire program flow is mapped
## Function Prioritization
1. Start with entry points and initialization functions
2. Focus on functions with high centrality in the call graph
3. Pay special attention to functions with:
- Command processing logic
- Error handling
- Security checks
- Data transformation
Remember to use the available MCGhidra tools:
- Use functions_list to find functions matching patterns
- Use xrefs_list to find cross-references
- Use functions_decompile for C-like representations
- Use functions_disassemble for lower-level analysis
- Use functions_rename to apply meaningful names
- Use data_* tools to work with program data
""",
"context": {
"program_info": program_info
}
}
@mcp.prompt("analyze_strings")
def analyze_strings_prompt(port: int = None, pattern: str = None):
"""A prompt to analyze string references in the binary
Useful for finding hardcoded paths, URLs, error messages, and other interesting strings.
Args:
port: Specific Ghidra instance port (optional)
pattern: Optional grep pattern to filter strings (e.g., "http", "password", "error")
"""
port = _get_instance_port(port)
# Get strings from the binary
strings_result = data_list_strings(port=port, page_size=100, grep=pattern, grep_ignorecase=True)
strings_list = []
if isinstance(strings_result, dict):
strings_list = strings_result.get("strings", strings_result.get("items", []))
# Format strings for display
strings_display = "\n".join([
f" {s.get('address', 'N/A')}: {s.get('value', s.get('string', str(s)))[:80]}"
for s in strings_list[:50]
]) if strings_list else "No strings found matching criteria"
filter_note = f" matching '{pattern}'" if pattern else ""
return {
"prompt": f"""
# String Analysis for Binary
Analyze the following strings{filter_note} found in the binary:
```
{strings_display}
```
Total strings shown: {len(strings_list[:50])} of {len(strings_list)}
## Analysis Tasks:
1. **Categorize Strings**: Group strings by type:
- File paths and system locations
- URLs and network addresses
- Error messages and debug strings
- Format strings (printf-style)
- Cryptographic constants or keys
- Configuration values
- User-visible messages
2. **Identify Interesting Patterns**:
- Look for hardcoded credentials or API keys
- Find debug/logging messages that reveal functionality
- Locate error handlers and their messages
- Identify protocol-related strings
3. **Cross-Reference Analysis**:
- For interesting strings, use xrefs_list to find where they're used
- Trace back to understand the context of usage
4. **Security Implications**:
- Note any strings that suggest security features
- Identify potential information disclosure
- Look for authentication/authorization related strings
## Recommended Follow-up Tools:
- `xrefs_list(address="<string_address>")` - Find code using a string
- `functions_decompile(address="<func_addr>")` - Analyze functions using interesting strings
- `data_list_strings(grep="<pattern>")` - Search for more specific patterns
""",
"context": {
"strings_count": len(strings_list),
"filter_pattern": pattern,
"sample_strings": strings_list[:20]
}
}
@mcp.prompt("trace_data_flow")
def trace_data_flow_prompt(name: str = None, address: str = None, port: int = None):
"""A prompt to trace data flow through a function
Analyzes how data moves through a function, tracking inputs to outputs.
Args:
name: Function name (mutually exclusive with address)
address: Function address in hex format
port: Specific Ghidra instance port (optional)
"""
port = _get_instance_port(port)
# Get function info
fn_info = None
decompiled = ""
variables = []
dataflow = []
if address:
fn_info = function_info_by_address(address=address, port=port)
decompiled = decompiled_function_by_address(address=address, port=port)
vars_result = function_variables_by_address(address=address, port=port)
dataflow_result = analysis_get_dataflow(address=address, port=port, page_size=50)
elif name:
fn_info = function_info_by_name(name=name, port=port)
decompiled = decompiled_function_by_name(name=name, port=port)
vars_result = function_variables_by_name(name=name, port=port)
# Get address for dataflow
if isinstance(fn_info, dict) and "entry_point" in fn_info:
dataflow_result = analysis_get_dataflow(address=fn_info["entry_point"], port=port, page_size=50)
else:
dataflow_result = {}
else:
return {"prompt": "Error: Must provide either name or address", "context": {}}
if isinstance(vars_result, dict):
variables = vars_result.get("variables", [])
if isinstance(dataflow_result, dict):
dataflow = dataflow_result.get("dataflow", dataflow_result.get("items", []))
# Format variables
vars_display = "\n".join([
f" {v.get('name', 'N/A')}: {v.get('type', 'unknown')} ({v.get('storage', 'N/A')})"
for v in variables[:20]
]) if variables else "No variables found"
func_name = name or address
if isinstance(fn_info, dict):
func_name = fn_info.get("name", func_name)
return {
"prompt": f"""
# Data Flow Analysis: {func_name}
## Decompiled Code:
```c
{decompiled}
```
## Variables:
```
{vars_display}
```
## Analysis Tasks:
1. **Input Identification**:
- Identify all function parameters and their types
- Find global variables accessed by this function
- Locate any data read from external sources (files, network, etc.)
2. **Data Transformation Tracking**:
- Trace how input data is modified through the function
- Identify any encoding/decoding operations
- Note arithmetic or bitwise operations on data
- Track buffer copies and string manipulations
3. **Output Analysis**:
- Identify return values and their sources
- Find any output parameters (pointers modified)
- Locate data written to external destinations
4. **Taint Analysis**:
- Mark user-controlled inputs as "tainted"
- Trace tainted data through the function
- Identify if tainted data reaches sensitive operations:
* Memory allocation sizes
* Array indices
* Format strings
* System calls
* Cryptographic functions
5. **Data Dependencies**:
- Map dependencies between variables
- Identify critical paths where data must be validated
- Note any sanitization or validation routines
## Security Focus:
- Does user input reach memory operations without bounds checking?
- Is data properly validated before use in sensitive contexts?
- Are there any type confusions or integer issues?
## Recommended Follow-up:
- `analysis_get_dataflow(address="...")` - Get detailed dataflow graph
- `xrefs_list(address="...")` - Find callers to understand input sources
- `analysis_get_callgraph(address="...")` - See what this function calls
""",
"context": {
"function_info": fn_info,
"variables": variables,
"dataflow_sample": dataflow[:10]
}
}
@mcp.prompt("identify_crypto")
async def identify_crypto_prompt(port: int = None, ctx: Context = None):
"""A prompt to identify cryptographic functions and constants in the binary
Searches for crypto-related patterns, constants, and function signatures.
Reports progress during multi-pattern scanning.
Args:
port: Specific Ghidra instance port (optional)
ctx: FastMCP context for progress reporting (auto-injected)
"""
port = _get_instance_port(port)
# Search for common crypto-related function names (15 patterns + 5 strings = 20 steps)
crypto_patterns = [
"crypt", "cipher", "aes", "des", "rsa", "sha", "md5", "hash",
"encrypt", "decrypt", "key", "ssl", "tls", "hmac", "pbkdf"
]
total_steps = len(crypto_patterns) + 5 # function patterns + string patterns
found_functions = []
for idx, pattern in enumerate(crypto_patterns, start=1):
await report_step(ctx, idx, total_steps, f"Scanning functions for '{pattern}'")
funcs_result = functions_list(port=port, grep=pattern, grep_ignorecase=True, page_size=20)
if isinstance(funcs_result, dict):
funcs = funcs_result.get("functions", funcs_result.get("items", []))
found_functions.extend(funcs)
# Remove duplicates (by address)
seen_addrs = set()
unique_funcs = []
for f in found_functions:
addr = f.get("address", f.get("entry_point", str(f)))
if addr not in seen_addrs:
seen_addrs.add(addr)
unique_funcs.append(f)
# Search for crypto-related strings
crypto_strings = []
string_patterns = ["BEGIN.*KEY", "-----", "AES", "RSA", "SHA"]
for idx, pattern in enumerate(string_patterns, start=len(crypto_patterns) + 1):
await report_step(ctx, idx, total_steps, f"Scanning strings for '{pattern}'")
strings_result = data_list_strings(port=port, grep=pattern, grep_ignorecase=True, page_size=10)
if isinstance(strings_result, dict):
strs = strings_result.get("strings", strings_result.get("items", []))
crypto_strings.extend(strs)
# Format output
funcs_display = "\n".join([
f" {f.get('address', f.get('entry_point', 'N/A'))}: {f.get('name', 'unknown')}"
for f in unique_funcs[:30]
]) if unique_funcs else "No obvious crypto functions found by name"
strings_display = "\n".join([
f" {s.get('address', 'N/A')}: {str(s.get('value', s.get('string', s)))[:60]}"
for s in crypto_strings[:20]
]) if crypto_strings else "No obvious crypto strings found"
return {
"prompt": f"""
# Cryptographic Analysis
## Potentially Crypto-Related Functions:
```
{funcs_display}
```
## Potentially Crypto-Related Strings:
```
{strings_display}
```
## Analysis Tasks:
1. **Identify Crypto Libraries**:
- Look for OpenSSL, mbedTLS, wolfSSL, or other library signatures
- Check for statically linked crypto code
- Identify any custom implementations
2. **Algorithm Identification**:
- **Symmetric**: AES, DES, 3DES, ChaCha20, RC4, Blowfish
- **Asymmetric**: RSA, ECC, DH, DSA
- **Hash**: SHA-1/256/512, MD5, BLAKE2
- **MAC**: HMAC, CMAC, Poly1305
- **KDF**: PBKDF2, scrypt, Argon2
3. **Constant Analysis**:
Look for these magic constants:
- AES S-box: 0x63, 0x7c, 0x77, 0x7b...
- SHA-256 init: 0x6a09e667, 0xbb67ae85...
- MD5 init: 0x67452301, 0xefcdab89...
- RSA public exponent: 0x10001 (65537)
4. **Key Handling**:
- How are keys generated or derived?
- Where are keys stored?
- Are keys properly protected in memory?
- Is there key rotation or expiration?
5. **Implementation Review**:
- Check for weak algorithms (MD5, SHA-1, RC4, DES)
- Look for ECB mode usage (insecure for most cases)
- Verify IV/nonce handling (should be random/unique)
- Check for hardcoded keys or IVs
6. **Security Concerns**:
- Timing side-channels in comparisons
- Insufficient key lengths
- Poor random number generation
- Key material in logs or error messages
## Recommended Follow-up:
- Decompile identified crypto functions for detailed analysis
- Check xrefs to understand where crypto is used
- Look for key generation/storage functions
- Search for random number generation (rand, /dev/urandom, etc.)
""",
"context": {
"crypto_functions": unique_funcs[:20],
"crypto_strings": crypto_strings[:10],
"function_count": len(unique_funcs)
}
}
@mcp.prompt("malware_triage")
async def malware_triage_prompt(port: int = None, ctx: Context = None):
"""A prompt for quick malware triage and analysis
Provides a structured approach to initial malware analysis.
Reports progress during multi-step data gathering.
Args:
port: Specific Ghidra instance port (optional)
ctx: FastMCP context for progress reporting (auto-injected)
"""
port = _get_instance_port(port)
# Total steps: 1 (program info) + 6 (entry points) + 8 (strings) + 6 (functions) = 21
total_steps = 21
current_step = 0
# Get program info
await report_step(ctx, 1, total_steps, "Getting program info")
program_info = ghidra_instance(port=port)
current_step = 1
# Get entry points and main functions
main_funcs = []
entry_names = ["main", "_main", "WinMain", "DllMain", "start", "_start"]
for i, name in enumerate(entry_names):
current_step += 1
await report_progress(ctx, current_step, total_steps, f"Searching for {name}")
funcs = functions_list(port=port, grep=f"^{name}$", page_size=5)
if isinstance(funcs, dict):
main_funcs.extend(funcs.get("functions", funcs.get("items", [])))
# Search for suspicious strings
suspicious_patterns = [
"cmd.exe", "powershell", "/bin/sh", "CreateRemoteThread",
"VirtualAlloc", "WriteProcessMemory", "http://", "https://",
]
suspicious_strings = []
for i, pattern in enumerate(suspicious_patterns):
current_step += 1
await report_progress(ctx, current_step, total_steps, f"Scanning strings for '{pattern}'")
strings_result = data_list_strings(port=port, grep=pattern, grep_ignorecase=True, page_size=5)
if isinstance(strings_result, dict):
strs = strings_result.get("strings", strings_result.get("items", []))
for s in strs:
s['_pattern'] = pattern
suspicious_strings.extend(strs)
# Search for suspicious imports/functions
suspicious_funcs = []
func_patterns = ["Virtual", "CreateThread", "LoadLibrary", "GetProcAddress", "Shell", "Inject"]
for i, pattern in enumerate(func_patterns):
current_step += 1
await report_progress(ctx, current_step, total_steps, f"Scanning functions for '{pattern}'")
funcs = functions_list(port=port, grep=pattern, grep_ignorecase=True, page_size=5)
if isinstance(funcs, dict):
suspicious_funcs.extend(funcs.get("functions", funcs.get("items", [])))
# Format outputs
main_display = "\n".join([
f" {f.get('address', f.get('entry_point', 'N/A'))}: {f.get('name', 'unknown')}"
for f in main_funcs
]) if main_funcs else "No standard entry points found"
strings_display = "\n".join([
f" [{s.get('_pattern', '?')}] {s.get('address', 'N/A')}: {str(s.get('value', s.get('string', s)))[:50]}"
for s in suspicious_strings[:20]
]) if suspicious_strings else "No suspicious strings found"
funcs_display = "\n".join([
f" {f.get('address', f.get('entry_point', 'N/A'))}: {f.get('name', 'unknown')}"
for f in suspicious_funcs[:20]
]) if suspicious_funcs else "No suspicious functions found"
return {
"prompt": f"""
# Malware Triage Analysis
**Binary**: {program_info.get('program_name', 'unknown')}
**Format**: {program_info.get('format', 'unknown')}
**Architecture**: {program_info.get('processor', 'unknown')}
## Entry Points:
```
{main_display}
```
## Suspicious Strings:
```
{strings_display}
```
## Suspicious Functions:
```
{funcs_display}
```
## Triage Checklist:
### 1. Static Indicators
- [ ] Check for packed/obfuscated sections
- [ ] Identify compiler and build artifacts
- [ ] Look for anti-analysis techniques
- [ ] Check import table for suspicious APIs
- [ ] Examine strings for IOCs (IPs, domains, paths)
### 2. Capability Assessment
**Persistence Mechanisms:**
- Registry modifications (RegSetValue, RegCreateKey)
- Service creation (CreateService, StartService)
- Scheduled tasks
- Startup folder modifications
**Network Capabilities:**
- C2 communication patterns
- Data exfiltration methods
- Download/upload functionality
- Protocol usage (HTTP, DNS, custom)
**Process Manipulation:**
- Process injection (WriteProcessMemory, CreateRemoteThread)
- Process hollowing
- DLL injection
- Thread hijacking
**Evasion Techniques:**
- Anti-debugging (IsDebuggerPresent, CheckRemoteDebugger)
- Anti-VM detection
- Timing checks
- Environment checks
**Payload Delivery:**
- Shellcode execution
- Reflective loading
- File dropping
- Memory-only execution
### 3. Priority Functions to Analyze
1. Entry point / main function
2. Functions with network-related names
3. Functions calling VirtualAlloc + Write + Execute
4. Functions with obfuscated names or unusual patterns
5. Error handlers and cleanup routines
### 4. IOC Extraction
- Extract all URLs, IPs, and domains
- Note file paths and registry keys
- Document mutex names
- Record any hardcoded credentials
## Recommended Follow-up:
- `functions_decompile(name="<entry_point>")` - Analyze main logic
- `xrefs_list(address="<suspicious_func>")` - Find usage patterns
- `data_list_strings(grep="<pattern>")` - Search for more IOCs
- `analysis_get_callgraph(address="...")` - Map execution flow
""",
"context": {
"program_info": program_info,
"entry_points": main_funcs,
"suspicious_strings_count": len(suspicious_strings),
"suspicious_funcs_count": len(suspicious_funcs)
}
}
@mcp.prompt("analyze_protocol")
def analyze_protocol_prompt(name: str = None, address: str = None, port: int = None):
"""A prompt to analyze network or file protocol handling
Helps reverse engineer protocol parsers and handlers.
Args:
name: Function name to analyze (optional)
address: Function address to analyze (optional)
port: Specific Ghidra instance port (optional)
"""
port = _get_instance_port(port)
# If specific function provided, get its details
target_decompiled = ""
target_info = None
if address:
target_decompiled = decompiled_function_by_address(address=address, port=port)
target_info = function_info_by_address(address=address, port=port)
elif name:
target_decompiled = decompiled_function_by_name(name=name, port=port)
target_info = function_info_by_name(name=name, port=port)
# Search for protocol-related functions
protocol_patterns = ["parse", "read", "recv", "process", "handle", "decode", "packet", "message", "frame", "header"]
protocol_funcs = []
for pattern in protocol_patterns[:5]:
funcs = functions_list(port=port, grep=pattern, grep_ignorecase=True, page_size=10)
if isinstance(funcs, dict):
protocol_funcs.extend(funcs.get("functions", funcs.get("items", [])))
# Deduplicate
seen = set()
unique_funcs = []
for f in protocol_funcs:
addr = f.get("address", f.get("entry_point", str(f)))
if addr not in seen:
seen.add(addr)
unique_funcs.append(f)
funcs_display = "\n".join([
f" {f.get('address', f.get('entry_point', 'N/A'))}: {f.get('name', 'unknown')}"
for f in unique_funcs[:25]
]) if unique_funcs else "No obvious protocol functions found"
target_section = ""
if target_decompiled:
func_name = name or address
if isinstance(target_info, dict):
func_name = target_info.get("name", func_name)
target_section = f"""
## Target Function: {func_name}
```c
{target_decompiled}
```
"""
return {
"prompt": f"""
# Protocol Analysis
{target_section}
## Potentially Protocol-Related Functions:
```
{funcs_display}
```
## Analysis Framework:
### 1. Message Structure
- **Header Analysis**: Identify fixed-size headers
- Magic bytes / signature
- Version field
- Message type / opcode
- Length field(s)
- Flags / options
- Checksum / CRC
- **Payload Analysis**: Variable-length data
- Field delimiters
- Length-prefixed fields
- Nested structures
- Padding / alignment
### 2. State Machine
- Identify protocol states (init, handshake, established, etc.)
- Map state transitions
- Find state storage variables
- Identify timeout handling
### 3. Message Types
For each message type, document:
- Opcode / type identifier
- Required fields
- Optional fields
- Expected responses
- Error conditions
### 4. Parsing Logic
- Buffer handling (how is input buffered?)
- Boundary checking (are lengths validated?)
- Error handling (what happens on malformed input?)
- Memory management (allocations, frees)
### 5. Security Analysis
- Integer overflows in length calculations
- Buffer overflows from unchecked lengths
- Format string issues
- Injection vulnerabilities
- Authentication/authorization checks
- Encryption/signing of messages
### 6. Documentation Format
Create protocol documentation:
```
+--------+--------+--------+--------+
| Magic | Ver | Type | Length |
+--------+--------+--------+--------+
| Payload... |
+--------+--------+--------+--------+
```
## Recommended Analysis Flow:
1. Find the main receive/read loop
2. Identify the dispatch table or switch statement
3. Analyze each message handler
4. Document the message format
5. Look for authentication handshakes
6. Check for encryption setup
## Recommended Tools:
- `functions_decompile(name="...")` - Analyze handler functions
- `structs_list()` / `structs_get()` - Find message structures
- `data_list_strings(grep="error")` - Find error messages
- `analysis_get_callgraph(address="...")` - Map handler relationships
""",
"context": {
"target_function": target_info,
"protocol_functions": unique_funcs[:15]
}
}
@mcp.prompt("find_main_logic")
async def find_main_logic_prompt(port: int = None, ctx: Context = None):
"""A prompt to find the main application logic past runtime initialization
Helps navigate past CRT startup, library initialization, and boilerplate
to find where the actual program logic begins. Reports progress during scanning.
Args:
port: Specific Ghidra instance port (optional)
ctx: FastMCP context for progress reporting (auto-injected)
"""
port = _get_instance_port(port)
# Get program info (step 1 of ~22: 17 entry names + 3 init + 1 info + 1 decompile)
await report_step(ctx, 1, 22, "Getting program info")
program_info = ghidra_instance(port=port)
# Find entry points and potential main functions
entry_candidates = []
# Standard entry point names across platforms
entry_names = [
# Unix/Linux
"main", "_main", "__main", "start", "_start", "__libc_start_main",
# Windows
"WinMain", "wWinMain", "wmain", "_wmain", "WinMainCRTStartup",
"mainCRTStartup", "wmainCRTStartup", "wWinMainCRTStartup",
# Windows DLL
"DllMain", "DllMainCRTStartup", "_DllMainCRTStartup@12",
# macOS
"_main", "start"
]
for idx, name in enumerate(entry_names, start=2):
await report_step(ctx, idx, 22, f"Searching for {name}")
funcs = functions_list(port=port, grep=f"^{name}$", page_size=5)
if isinstance(funcs, dict):
for f in funcs.get("functions", funcs.get("items", [])):
f['_match_type'] = 'exact_name'
entry_candidates.append(f)
# Also search for functions with "init" or "setup" that might be called early
init_patterns = ["init", "setup", "initialize"]
init_funcs = []
base_step = 2 + len(entry_names) # 19
for idx, pattern in enumerate(init_patterns, start=base_step):
await report_step(ctx, idx, 22, f"Scanning for {pattern} functions")
funcs = functions_list(port=port, grep=pattern, grep_ignorecase=True, page_size=10)
if isinstance(funcs, dict):
init_funcs.extend(funcs.get("functions", funcs.get("items", []))[:5])
# Get decompilation of main entry point if found
main_decompiled = ""
main_entry = None
for candidate in entry_candidates:
name = candidate.get("name", "")
if name.lower() in ["main", "_main", "winmain", "wwinmain"]:
main_entry = candidate
addr = candidate.get("address", candidate.get("entry_point"))
if addr:
main_decompiled = decompiled_function_by_address(address=addr, port=port)
break
# Format entry points
entries_display = "\n".join([
f" {f.get('address', f.get('entry_point', 'N/A'))}: {f.get('name', 'unknown')}"
for f in entry_candidates[:15]
]) if entry_candidates else "No standard entry points found"
init_display = "\n".join([
f" {f.get('address', f.get('entry_point', 'N/A'))}: {f.get('name', 'unknown')}"
for f in init_funcs[:10]
]) if init_funcs else "No initialization functions found"
main_section = ""
if main_decompiled and main_entry:
main_section = f"""
## Main Function: {main_entry.get('name', 'unknown')}
```c
{main_decompiled}
```
"""
return {
"prompt": f"""
# Finding the Main Application Logic
**Binary**: {program_info.get('program_name', 'unknown')}
**Architecture**: {program_info.get('language', 'unknown')}
## Entry Point Candidates
```
{entries_display}
```
## Initialization Functions
```
{init_display}
```
{main_section}
## Analysis Strategy
### Phase 1: Identify True Entry Point
The program's execution flow typically follows this pattern:
```
OS Loader
└─→ _start / Entry Point (CRT startup)
└─→ __libc_start_main (glibc) / mainCRTStartup (MSVC)
└─→ Global constructors (__init_array, .ctors)
└─→ main() / WinMain() ← ACTUAL LOGIC STARTS HERE
```
**For ELF binaries:**
1. `_start` calls `__libc_start_main(main, argc, argv, ...)`
2. Look for the first argument passed to `__libc_start_main` - that's `main`
3. Or find function called after `__libc_csu_init`
**For PE binaries:**
1. Entry point is usually `mainCRTStartup` or `WinMainCRTStartup`
2. Look for call to `main`/`WinMain` after `__security_init_cookie`
3. Check for `_initterm` calls (global constructor invocation)
### Phase 2: Navigate Past Boilerplate
**Skip these patterns:**
- Security cookie initialization (`__security_init_cookie`)
- Heap initialization (`_heap_init`, `HeapCreate`)
- Locale/encoding setup (`setlocale`, `_setmbcp`)
- Exception handler registration (`__try`/`__except` setup)
- TLS callbacks (check `.tls` section)
- ATL/MFC initialization (look for `AfxWinMain`)
**Find the real logic by looking for:**
- Command-line argument processing (`argc`, `argv`, `GetCommandLine`)
- Configuration file loading
- Main event loop or service dispatcher
- First significant branching based on user input
### Phase 3: Map the Core Logic
Once you find `main` or equivalent:
1. **Identify the primary dispatch pattern:**
- Is it a CLI tool (argument parsing → action)?
- Is it a service (initialization → main loop)?
- Is it a GUI app (window creation → message pump)?
2. **Find the "inner main":**
- Many programs have a wrapper main that just calls the real logic
- Look for the function that receives parsed arguments
- Often named like `real_main`, `app_main`, `do_work`, etc.
3. **Document the high-level flow:**
```
main()
├── parse_arguments()
├── initialize_subsystems()
├── load_configuration()
└── run_main_loop() ← Primary logic here
```
### Red Flags (Not Main Logic)
- Functions with `crt`, `init`, `startup` in name
- Functions that only call other init functions
- Functions setting up global state without processing input
- Exception handler registration functions
## Recommended Next Steps
1. `functions_decompile(name="main")` - Analyze main if found
2. `analysis_get_callgraph(name="main")` - See what main calls
3. `xrefs_list(address="<main_addr>")` - Verify main is called from CRT
4. Look for the first function that processes `argc`/`argv` or user input
""",
"context": {
"program_info": program_info,
"entry_candidates": entry_candidates[:10],
"init_functions": init_funcs[:10],
"main_function": main_entry
}
}
@mcp.prompt("analyze_imports")
async def analyze_imports_prompt(port: int = None, ctx: Context = None):
"""A prompt to analyze the import table and understand binary capabilities
Categorizes imports by functionality to quickly assess what a binary can do.
Reports progress during multi-category scanning.
Args:
port: Specific Ghidra instance port (optional)
ctx: FastMCP context for progress reporting (auto-injected)
"""
port = _get_instance_port(port)
# Get program info
await report_step(ctx, 1, 12, "Getting program info")
program_info = ghidra_instance(port=port)
# Define capability categories and their indicator functions
categories = {
"File Operations": ["CreateFile", "ReadFile", "WriteFile", "DeleteFile", "fopen", "fread", "fwrite", "open", "read", "write", "unlink", "remove"],
"Network": ["socket", "connect", "send", "recv", "WSAStartup", "getaddrinfo", "inet_", "http", "InternetOpen", "WinHttpOpen", "URLDownload"],
"Process/Thread": ["CreateProcess", "CreateThread", "CreateRemoteThread", "OpenProcess", "TerminateProcess", "fork", "exec", "pthread"],
"Memory": ["VirtualAlloc", "VirtualProtect", "WriteProcessMemory", "ReadProcessMemory", "mmap", "mprotect", "malloc", "HeapAlloc"],
"Registry (Windows)": ["RegOpenKey", "RegSetValue", "RegQueryValue", "RegCreateKey", "RegDeleteKey"],
"Crypto": ["Crypt", "BCrypt", "NCrypt", "AES", "RSA", "SHA", "MD5", "SSL", "TLS", "EVP_"],
"DLL/Library": ["LoadLibrary", "GetProcAddress", "dlopen", "dlsym", "FreeLibrary"],
"User Interface": ["MessageBox", "CreateWindow", "GetDlgItem", "DialogBox", "gtk_", "Qt"],
"Service": ["CreateService", "StartService", "OpenSCManager", "ControlService"],
"Debugging/Evasion": ["IsDebuggerPresent", "CheckRemoteDebugger", "NtQueryInformationProcess", "OutputDebugString", "ptrace"],
}
# Search for functions matching each category (10 categories + 1 info + 1 analysis = 12 steps)
capability_results = {}
all_found = []
for idx, (category, patterns) in enumerate(categories.items(), start=2):
await report_step(ctx, idx, 12, f"Scanning {category}")
found = []
for pattern in patterns[:5]: # Limit queries per category
funcs = functions_list(port=port, grep=pattern, grep_ignorecase=True, page_size=10)
if isinstance(funcs, dict):
for f in funcs.get("functions", funcs.get("items", [])):
f['_category'] = category
f['_pattern'] = pattern
found.append(f)
all_found.append(f)
# Deduplicate within category
seen = set()
unique = []
for f in found:
addr = f.get("address", f.get("entry_point", str(f)))
if addr not in seen:
seen.add(addr)
unique.append(f)
if unique:
capability_results[category] = unique
# Format capability summary
capability_summary = []
for category, funcs in capability_results.items():
func_names = [f.get('name', 'unknown') for f in funcs[:5]]
extras = f" (+{len(funcs)-5} more)" if len(funcs) > 5 else ""
capability_summary.append(f"**{category}** ({len(funcs)} functions)")
capability_summary.append(f" └─ {', '.join(func_names)}{extras}")
summary_display = "\n".join(capability_summary) if capability_summary else "No notable imports detected"
# Identify suspicious combinations
suspicious_combos = []
cats = set(capability_results.keys())
if "Memory" in cats and "Process/Thread" in cats:
if any("WriteProcessMemory" in f.get('name', '') for f in capability_results.get("Memory", [])):
suspicious_combos.append("⚠️ **Process Injection Pattern**: Memory + Process manipulation detected")
if "Network" in cats and "Crypto" in cats:
suspicious_combos.append("🔐 **Encrypted Communication**: Network + Crypto APIs present")
if "DLL/Library" in cats and "Memory" in cats:
suspicious_combos.append("⚠️ **Dynamic Loading Pattern**: LoadLibrary + Memory manipulation")
if "Debugging/Evasion" in cats:
suspicious_combos.append("🛡️ **Anti-Analysis**: Debugger detection APIs present")
if "Service" in cats and "Registry (Windows)" in cats:
suspicious_combos.append("📌 **Persistence Pattern**: Service + Registry access")
combos_display = "\n".join(suspicious_combos) if suspicious_combos else "No suspicious combinations detected"
return {
"prompt": f"""
# Import Analysis Report
**Binary**: {program_info.get('program_name', 'unknown')}
**Architecture**: {program_info.get('language', 'unknown')}
## Capability Summary
{summary_display}
## Suspicious Combinations
{combos_display}
## Detailed Analysis Framework
### 1. Capability Risk Assessment
| Capability | Risk Level | Investigation Priority |
|------------|------------|----------------------|
| Process Injection (WriteProcessMemory + CreateRemoteThread) | 🔴 Critical | Immediate |
| Code Download & Execute (URLDownload + ShellExecute) | 🔴 Critical | Immediate |
| Anti-Debugging | 🟡 Medium | High |
| Registry Persistence | 🟡 Medium | High |
| Encrypted Network I/O | 🟡 Medium | Medium |
| Standard File I/O | 🟢 Low | Low |
### 2. Import Pattern Analysis
**Injection Indicators:**
- `VirtualAllocEx` + `WriteProcessMemory` + `CreateRemoteThread` = Classic injection
- `NtCreateThreadEx` + `NtMapViewOfSection` = Stealthier injection
- `SetWindowsHookEx` = DLL injection via hooks
- `QueueUserAPC` = APC injection
**Evasion Indicators:**
- `IsDebuggerPresent`, `CheckRemoteDebuggerPresent` = Basic anti-debug
- `NtQueryInformationProcess` (ProcessDebugPort) = Advanced anti-debug
- `GetTickCount` comparisons = Timing-based detection
- `rdtsc` instruction usage = VM/sandbox detection
**Persistence Indicators:**
- `RegSetValueEx` with Run/RunOnce keys
- `CreateService` / `ChangeServiceConfig`
- `SchRpcRegisterTask` = Scheduled tasks
- `CopyFile` to startup locations
**Data Exfiltration Indicators:**
- `InternetOpen` + `InternetConnect` + `HttpSendRequest`
- `socket` + `connect` to non-standard ports
- `CryptEncrypt` before network send
- `compress` / `zip` functions before send
### 3. Library-Specific Patterns
**OpenSSL Indicators:**
- `SSL_CTX_new`, `SSL_connect`, `SSL_read`, `SSL_write`
- Likely secure communications
**Windows Crypto API:**
- `CryptAcquireContext`, `CryptCreateHash`, `CryptEncrypt`
- Check for hardcoded keys or weak algorithms
**Compression Libraries:**
- `deflate`, `inflate` (zlib)
- `LZ4_compress`, `LZ4_decompress`
- Often used before exfiltration
### 4. Cross-Reference Strategy
For each suspicious import:
1. Find all call sites: `xrefs_list(name="<import_name>")`
2. Analyze calling functions: Look for the orchestrating function
3. Check data flow: What data reaches these calls?
### 5. Priority Functions to Analyze
Based on the imports found, prioritize:
1. Functions that call multiple suspicious APIs
2. Functions that set up network connections
3. Functions that manipulate other processes
4. Functions referenced from entry points
## Recommended Next Steps
- `xrefs_list(name="<suspicious_import>")` - Find usage locations
- `functions_decompile(address="<caller>")` - Analyze calling code
- `analysis_get_callgraph(name="<orchestrator>")` - Map the attack flow
- `data_list_strings(grep="http|ftp|\\\\\\\\")` - Find network destinations
""",
"context": {
"program_info": program_info,
"capabilities": {k: [f.get('name') for f in v[:10]] for k, v in capability_results.items()},
"total_imports_analyzed": len(all_found),
"suspicious_patterns": suspicious_combos
}
}
@mcp.prompt("find_authentication")
async def find_authentication_prompt(port: int = None, ctx: Context = None):
"""A prompt to locate authentication, authorization, and credential handling code
Helps find password validation, license checks, session management, and access control.
Reports progress during multi-pattern scanning.
Args:
port: Specific Ghidra instance port (optional)
ctx: FastMCP context for progress reporting (auto-injected)
"""
port = _get_instance_port(port)
# Get program info
await report_step(ctx, 1, 30, "Getting program info")
program_info = ghidra_instance(port=port)
# Search for authentication-related function names (17 patterns)
auth_patterns = [
"auth", "login", "logon", "password", "passwd", "credential",
"verify", "validate", "check", "license", "serial", "key",
"token", "session", "permission", "access", "privilege"
]
auth_funcs = []
for idx, pattern in enumerate(auth_patterns, start=2):
await report_step(ctx, idx, 30, f"Scanning functions for '{pattern}'")
funcs = functions_list(port=port, grep=pattern, grep_ignorecase=True, page_size=10)
if isinstance(funcs, dict):
for f in funcs.get("functions", funcs.get("items", [])):
f['_pattern'] = pattern
auth_funcs.append(f)
# Deduplicate
seen = set()
unique_auth = []
for f in auth_funcs:
addr = f.get("address", f.get("entry_point", str(f)))
if addr not in seen:
seen.add(addr)
unique_auth.append(f)
# Search for authentication-related strings (6 patterns)
auth_string_patterns = [
"password", "invalid", "incorrect", "denied", "authorized",
"authentication", "license", "expired", "trial", "registered"
]
auth_strings = []
base_step = 2 + len(auth_patterns) # 19
for idx, pattern in enumerate(auth_string_patterns[:6], start=base_step):
await report_step(ctx, idx, 30, f"Scanning strings for '{pattern}'")
strings = data_list_strings(port=port, grep=pattern, grep_ignorecase=True, page_size=8)
if isinstance(strings, dict):
for s in strings.get("strings", strings.get("items", [])):
s['_pattern'] = pattern
auth_strings.append(s)
# Search for crypto functions often used in auth (6 patterns)
crypto_patterns = ["hash", "sha", "md5", "bcrypt", "hmac", "pbkdf"]
crypto_auth = []
base_step = 19 + 6 # 25
for idx, pattern in enumerate(crypto_patterns, start=base_step):
await report_step(ctx, idx, 30, f"Scanning crypto '{pattern}'")
funcs = functions_list(port=port, grep=pattern, grep_ignorecase=True, page_size=5)
if isinstance(funcs, dict):
crypto_auth.extend(funcs.get("functions", funcs.get("items", []))[:3])
# Format outputs
funcs_display = "\n".join([
f" {f.get('address', f.get('entry_point', 'N/A'))}: {f.get('name', 'unknown')} [{f.get('_pattern', '')}]"
for f in unique_auth[:25]
]) if unique_auth else "No obvious authentication functions found"
strings_display = "\n".join([
f" {s.get('address', 'N/A')}: \"{str(s.get('value', s.get('string', s)))[:50]}\" [{s.get('_pattern', '')}]"
for s in auth_strings[:20]
]) if auth_strings else "No authentication-related strings found"
crypto_display = "\n".join([
f" {f.get('address', f.get('entry_point', 'N/A'))}: {f.get('name', 'unknown')}"
for f in crypto_auth[:10]
]) if crypto_auth else "No crypto functions found"
return {
"prompt": f"""
# Authentication & Authorization Analysis
**Binary**: {program_info.get('program_name', 'unknown')}
## Potential Authentication Functions
```
{funcs_display}
```
## Authentication-Related Strings
```
{strings_display}
```
## Cryptographic Functions (Often Used in Auth)
```
{crypto_display}
```
## Analysis Framework
### 1. Authentication Pattern Recognition
**Password Validation Patterns:**
```c
// Pattern 1: Direct comparison (WEAK)
if (strcmp(input_password, "hardcoded") == 0)
// Pattern 2: Hash comparison (Better)
hash = compute_hash(input_password);
if (memcmp(hash, stored_hash, 32) == 0)
// Pattern 3: API-based (Best)
result = CheckCredentials(username, password);
```
**License Key Validation Patterns:**
```c
// Pattern 1: Checksum validation
if (compute_checksum(key) == expected)
// Pattern 2: Algorithmic (XOR, math operations)
decoded = key ^ magic_constant;
if (decoded % prime == 0)
// Pattern 3: Online validation
result = validate_with_server(key);
```
### 2. Common Vulnerability Points
| Vulnerability | What to Look For |
|--------------|-----------------|
| Hardcoded credentials | String comparisons with constants |
| Weak hashing | MD5/SHA1 without salt |
| Bypassable checks | Single comparison that can be NOPed |
| Logic flaws | Inverted conditions, early returns |
| Timing attacks | Non-constant-time comparisons |
| Default credentials | Strings like "admin", "password", "default" |
### 3. Finding the Auth Decision Point
The critical point is usually:
```
┌─────────────┐
│ Auth Check │
└──────┬──────┘
┌───────┴───────┐
▼ ▼
[SUCCESS] [FAILURE]
Grant Access Deny/Error
```
**To find it:**
1. Locate error strings ("Invalid password", "Access denied")
2. Find xrefs to those strings
3. Look for the conditional branch before the error
4. The other branch leads to success path
### 4. Session Management Analysis
Look for:
- Token generation after successful auth
- Session ID storage (cookies, memory, files)
- Session timeout handling
- Session invalidation on logout
**Session Token Red Flags:**
- Predictable generation (sequential, time-based)
- Insufficient entropy
- No expiration
- Stored in plaintext
### 5. Privilege Escalation Points
Check for:
- Role/permission checks: `if (user.role == ADMIN)`
- Capability flags: `if (flags & CAN_WRITE)`
- Group membership: `IsUserInGroup()`
- File/resource ACLs
### 6. Bypass Strategies (For Security Research)
**Binary Patching Targets:**
- JZ → JNZ (invert condition)
- CALL auth_check → NOP
- Return value modification
**Runtime Bypass:**
- Hook authentication function
- Modify comparison result
- Inject valid session
### 7. Recommended Analysis Flow
```
Step 1: Find auth strings
└─→ "Invalid password", "Access denied", etc.
Step 2: Trace to calling function
└─→ xrefs_list(address="<string_addr>")
Step 3: Analyze the decision logic
└─→ functions_decompile(address="<func>")
Step 4: Find the success path
└─→ What happens when auth succeeds?
Step 5: Map the complete auth flow
└─→ analysis_get_callgraph(address="<auth_func>")
```
## Recommended Next Steps
- `xrefs_list(address="<auth_string>")` - Find code using auth messages
- `functions_decompile(name="<auth_func>")` - Analyze authentication logic
- `data_list_strings(grep="admin|root|password")` - Find potential credentials
- `analysis_get_callgraph(name="<auth_func>")` - Map auth code flow
""",
"context": {
"program_info": program_info,
"auth_functions": [f.get('name') for f in unique_auth[:15]],
"auth_strings": [s.get('value', s.get('string', ''))[:40] for s in auth_strings[:10]],
"crypto_functions": [f.get('name') for f in crypto_auth[:10]]
}
}
@mcp.prompt("analyze_switch_table")
def analyze_switch_table_prompt(name: str = None, address: str = None, port: int = None):
"""A prompt to analyze switch/dispatch tables for command processing
Helps reverse engineer command handlers, protocol dispatchers, and menu systems.
Args:
name: Function name containing switch (optional)
address: Function address containing switch (optional)
port: Specific Ghidra instance port (optional)
"""
port = _get_instance_port(port)
# Get program info
program_info = ghidra_instance(port=port)
# If specific function provided, get its details
target_decompiled = ""
target_disasm = ""
target_info = None
if address:
target_decompiled = decompiled_function_by_address(address=address, port=port)
target_disasm = disassembly_by_address(address=address, port=port)
target_info = function_info_by_address(address=address, port=port)
elif name:
target_decompiled = decompiled_function_by_name(name=name, port=port)
target_disasm = disassembly_by_name(name=name, port=port)
target_info = function_info_by_name(name=name, port=port)
# Search for potential dispatch functions
dispatch_patterns = [
"dispatch", "handler", "process", "handle", "command", "cmd",
"opcode", "switch", "route", "execute", "action"
]
dispatch_funcs = []
for pattern in dispatch_patterns[:6]:
funcs = functions_list(port=port, grep=pattern, grep_ignorecase=True, page_size=8)
if isinstance(funcs, dict):
dispatch_funcs.extend(funcs.get("functions", funcs.get("items", []))[:4])
# Deduplicate
seen = set()
unique_dispatch = []
for f in dispatch_funcs:
addr = f.get("address", f.get("entry_point", str(f)))
if addr not in seen:
seen.add(addr)
unique_dispatch.append(f)
# Format outputs
target_section = ""
if target_decompiled:
func_name = name or address
if isinstance(target_info, dict):
func_name = target_info.get("name", func_name)
target_section = f"""
## Target Function: {func_name}
### Decompiled Code:
```c
{target_decompiled}
```
### Disassembly (for jump table analysis):
```
{target_disasm[:3000] if target_disasm else "Not available"}
```
"""
dispatch_display = "\n".join([
f" {f.get('address', f.get('entry_point', 'N/A'))}: {f.get('name', 'unknown')}"
for f in unique_dispatch[:15]
]) if unique_dispatch else "No obvious dispatch functions found"
return {
"prompt": f"""
# Switch/Dispatch Table Analysis
**Binary**: {program_info.get('program_name', 'unknown')}
{target_section}
## Potential Dispatch Functions
```
{dispatch_display}
```
## Analysis Framework
### 1. Identifying Switch Patterns
**Compiler-Generated Patterns:**
```c
// Direct switch (small, sparse values)
switch(cmd) {{
case 1: handle_read(); break;
case 2: handle_write(); break;
case 5: handle_delete(); break;
}}
```
Assembly: Series of CMP + JE instructions
```c
// Jump table (dense sequential values)
switch(cmd) {{
case 0: case 1: case 2: case 3: ...
}}
```
Assembly: Bounds check + indirect jump via table
```c
// Binary search (many sparse values)
switch(cmd) {{
case 100: case 200: case 500: case 1000: ...
}}
```
Assembly: Nested CMP comparisons
### 2. Jump Table Recognition
**x86/x64 Pattern:**
```asm
cmp eax, MAX_CASE ; Bounds check
ja default_case ; Out of range
mov eax, [jump_table + rax*4] ; Load handler
jmp rax ; Indirect jump
```
**Ghidra Indicators:**
- Look for `switchD_` labels in disassembly
- Check for computed jumps (`jmp [reg + offset]`)
- Find tables of addresses in `.rodata` or `.rdata`
### 3. Extracting Case Handlers
For each case value, document:
| Case | Value | Handler Address | Purpose |
|------|-------|-----------------|---------|
| 0 | 0x00 | 0x401000 | Initialize |
| 1 | 0x01 | 0x401050 | Read data |
| 2 | 0x02 | 0x4010A0 | Write data |
| ... | ... | ... | ... |
### 4. Command Protocol Analysis
**Common Dispatch Architectures:**
```
Type 1: Flat Dispatch
┌──────────────┐
│ Read Command │
└──────┬───────┘
┌──────────────┐
│ switch(cmd) │──→ handler_1()
│ │──→ handler_2()
│ │──→ handler_3()
└──────────────┘
```
```
Type 2: Nested Dispatch
┌──────────────┐
│ Read Group │
└──────┬───────┘
┌──────────────┐ ┌─────────────┐
│switch(group) │──→ │switch(subcmd)│
└──────────────┘ └─────────────┘
```
```
Type 3: Function Pointer Table
┌──────────────────────────────────┐
│ handlers[] = {{h1, h2, h3, ...}}
│ handlers[cmd]() │
└──────────────────────────────────┘
```
### 5. Reverse Engineering Strategy
**Step 1: Find the dispatch point**
- Look for the main switch or function pointer call
- Identify the command/opcode variable
**Step 2: Map all cases**
- Extract all case values
- Find corresponding handler addresses
- Note default/error handling
**Step 3: Analyze each handler**
- What parameters does it receive?
- What actions does it perform?
- What does it return?
**Step 4: Document the protocol**
```
Command Format:
┌────────┬────────┬──────────┐
│ OpCode │ Length │ Payload │
│ 1 byte │ 2 bytes│ N bytes │
└────────┴────────┴──────────┘
OpCode 0x01: READ
Payload: [offset:4][length:4]
Response: [data:length]
OpCode 0x02: WRITE
Payload: [offset:4][length:4][data:length]
Response: [status:1]
```
### 6. Finding Hidden Commands
**Look for:**
- Cases with no obvious string references (debug commands)
- Cases that check additional conditions (privileged commands)
- Default case that does something other than error
- Gaps in sequential case numbers
### 7. Common Pitfalls
- **Virtual dispatch**: C++ vtables look like switch tables
- **String switches**: May use hash-based dispatch
- **Multi-level switches**: Nested command/subcommand structure
- **Indirect handlers**: Function pointers read from data structures
## Recommended Next Steps
- `functions_decompile(address="<handler>")` - Analyze individual handlers
- `xrefs_list(name="<dispatch_func>")` - Find what calls the dispatcher
- `data_list(grep="<near_switch_addr>")` - Find jump tables in data
- `analysis_get_callgraph(address="<dispatch>")` - Map handler relationships
""",
"context": {
"program_info": program_info,
"target_function": target_info,
"dispatch_functions": [f.get('name') for f in unique_dispatch[:15]]
}
}
@mcp.prompt("find_config_parsing")
async def find_config_parsing_prompt(port: int = None, ctx: Context = None):
"""A prompt to identify configuration file parsing and settings management
Helps find how a program reads, parses, and stores its configuration.
Reports progress during multi-category scanning.
Args:
port: Specific Ghidra instance port (optional)
ctx: FastMCP context for progress reporting (auto-injected)
"""
port = _get_instance_port(port)
# Get program info (total: 8 config + 6 strings + 4 registry + 4 env + 1 info = 23)
await report_step(ctx, 1, 23, "Getting program info")
program_info = ghidra_instance(port=port)
# Search for config-related functions (8 patterns)
config_patterns = [
"config", "setting", "option", "preference", "pref",
"ini", "json", "xml", "yaml", "toml", "parse",
"load", "save", "read", "write"
]
config_funcs = []
for idx, pattern in enumerate(config_patterns[:8], start=2):
await report_step(ctx, idx, 23, f"Scanning config functions: '{pattern}'")
funcs = functions_list(port=port, grep=pattern, grep_ignorecase=True, page_size=8)
if isinstance(funcs, dict):
for f in funcs.get("functions", funcs.get("items", []))[:4]:
f['_pattern'] = pattern
config_funcs.append(f)
# Deduplicate
seen = set()
unique_config = []
for f in config_funcs:
addr = f.get("address", f.get("entry_point", str(f)))
if addr not in seen:
seen.add(addr)
unique_config.append(f)
# Search for config-related strings (file paths, keys, defaults) (6 patterns)
config_strings = []
string_patterns = [
"\\.ini", "\\.json", "\\.xml", "\\.cfg", "\\.conf",
"config", "setting", "/etc/", "AppData", "HKEY_"
]
base_step = 2 + 8 # 10
for idx, pattern in enumerate(string_patterns[:6], start=base_step):
await report_step(ctx, idx, 23, f"Scanning config strings: '{pattern}'")
strings = data_list_strings(port=port, grep=pattern, grep_ignorecase=True, page_size=8)
if isinstance(strings, dict):
config_strings.extend(strings.get("strings", strings.get("items", []))[:4])
# Search for registry functions (Windows) (4 patterns)
registry_patterns = ["RegOpen", "RegQuery", "RegSet", "RegGet"]
registry_funcs = []
base_step = 10 + 6 # 16
for idx, pattern in enumerate(registry_patterns, start=base_step):
await report_step(ctx, idx, 23, f"Scanning registry: '{pattern}'")
funcs = functions_list(port=port, grep=pattern, page_size=5)
if isinstance(funcs, dict):
registry_funcs.extend(funcs.get("functions", funcs.get("items", []))[:3])
# Search for environment variable functions (4 patterns)
env_patterns = ["getenv", "GetEnvironmentVariable", "setenv", "putenv"]
env_funcs = []
base_step = 16 + 4 # 20
for idx, pattern in enumerate(env_patterns, start=base_step):
await report_step(ctx, idx, 23, f"Scanning environment: '{pattern}'")
funcs = functions_list(port=port, grep=pattern, page_size=3)
if isinstance(funcs, dict):
env_funcs.extend(funcs.get("functions", funcs.get("items", []))[:2])
# Format outputs
config_display = "\n".join([
f" {f.get('address', f.get('entry_point', 'N/A'))}: {f.get('name', 'unknown')} [{f.get('_pattern', '')}]"
for f in unique_config[:20]
]) if unique_config else "No config-related functions found"
strings_display = "\n".join([
f" {s.get('address', 'N/A')}: \"{str(s.get('value', s.get('string', s)))[:60]}\""
for s in config_strings[:15]
]) if config_strings else "No config-related strings found"
registry_display = "\n".join([
f" {f.get('address', f.get('entry_point', 'N/A'))}: {f.get('name', 'unknown')}"
for f in registry_funcs[:8]
]) if registry_funcs else "No registry functions found"
env_display = "\n".join([
f" {f.get('address', f.get('entry_point', 'N/A'))}: {f.get('name', 'unknown')}"
for f in env_funcs[:5]
]) if env_funcs else "No environment functions found"
return {
"prompt": f"""
# Configuration Analysis
**Binary**: {program_info.get('program_name', 'unknown')}
## Config-Related Functions
```
{config_display}
```
## Config-Related Strings (File Paths, Keys)
```
{strings_display}
```
## Registry Access (Windows)
```
{registry_display}
```
## Environment Variable Access
```
{env_display}
```
## Analysis Framework
### 1. Configuration Sources
**Priority Order (typical):**
```
1. Command-line arguments (--config=X, -c X)
2. Environment variables ($APP_CONFIG, %APP_CONFIG%)
3. User config file (~/.apprc, %APPDATA%\\app\\config)
4. System config file (/etc/app.conf, %PROGRAMDATA%)
5. Compiled defaults (hardcoded fallbacks)
```
### 2. File Format Patterns
**INI Format:**
```c
// Look for:
GetPrivateProfileString() // Windows API
fgets() + strchr('[') // Manual parsing
sscanf(line, "[%s]", section)
```
**JSON Format:**
```c
// Library indicators:
cJSON_Parse(), cJSON_GetObjectItem() // cJSON
json_loads(), json_object_get() // jansson
nlohmann::json // C++ nlohmann
```
**XML Format:**
```c
// Library indicators:
xmlReadFile(), xmlDocGetRootElement() // libxml2
tinyxml2::XMLDocument // TinyXML2
expat functions (XML_Parse) // Expat
```
**Custom Binary:**
```c
// Look for:
fread(&config_struct, sizeof(...))
Magic number checks at file start
Version field parsing
```
### 3. Registry Configuration (Windows)
**Common Locations:**
```
HKEY_CURRENT_USER\\Software\\<Vendor>\\<App>
HKEY_LOCAL_MACHINE\\Software\\<Vendor>\\<App>
HKEY_LOCAL_MACHINE\\SYSTEM\\CurrentControlSet\\Services\\<Service>
```
**Analysis Points:**
- What keys are read vs written?
- Are there fallback values if key missing?
- Is sensitive data stored (credentials, keys)?
### 4. Environment Variables
**Common Patterns:**
```c
// Direct usage
char* value = getenv("APP_DEBUG");
if (value && strcmp(value, "1") == 0) {{
debug_mode = true;
}}
// With defaults
char* path = getenv("APP_CONFIG");
if (!path) path = "/etc/app.conf";
```
**Security Note:** Environment variables can leak to child processes!
### 5. Configuration Structure Mapping
Document the config schema:
```
struct AppConfig {{
// File locations
char log_path[256]; // from: log_file=
char data_dir[256]; // from: data_directory=
// Network settings
char server_host[64]; // from: server=
int server_port; // from: port=
// Feature flags
bool debug_enabled; // from: debug=true/false
int verbosity; // from: verbose=0-3
}}
```
### 6. Default Value Discovery
**Hardcoded defaults reveal expected values:**
```c
// These strings tell you valid options
if (!config.mode)
config.mode = "production"; // Modes: "production", "debug", "test"?
if (config.timeout <= 0)
config.timeout = 30; // Default timeout: 30 seconds
```
### 7. Config Modification Vectors
**For security research:**
- Can config file be written by unprivileged user?
- Are file paths validated (path traversal)?
- Is config file integrity verified?
- Can environment variables override secure settings?
- Are sensitive values encrypted at rest?
### 8. Parsing Vulnerability Patterns
| Pattern | Risk | Example |
|---------|------|---------|
| Unbounded string copy | Buffer overflow | `strcpy(cfg.name, value)` |
| Integer parsing | Overflow | `atoi()` without bounds |
| Path concatenation | Traversal | `sprintf(path, "%s/%s", dir, file)` |
| Format strings | Code exec | `printf(config_value)` |
## Recommended Next Steps
- `xrefs_list(name="<config_func>")` - Find where config is loaded
- `functions_decompile(name="<parse_func>")` - Analyze parsing logic
- `data_list_strings(grep="default|=")` - Find default values
- `structs_list()` - Look for config structure definitions
""",
"context": {
"program_info": program_info,
"config_functions": [f.get('name') for f in unique_config[:15]],
"config_strings": [str(s.get('value', s.get('string', '')))[:50] for s in config_strings[:10]],
"has_registry": len(registry_funcs) > 0,
"has_env": len(env_funcs) > 0
}
}
@mcp.prompt("compare_functions")
def compare_functions_prompt(func1_name: str = None, func1_address: str = None,
func2_name: str = None, func2_address: str = None,
port: int = None):
"""A prompt to compare two functions for similarity analysis
Useful for identifying library code, patches, or malware variants.
Args:
func1_name: First function name (optional if address provided)
func1_address: First function address (optional if name provided)
func2_name: Second function name (optional if address provided)
func2_address: Second function address (optional if name provided)
port: Specific Ghidra instance port (optional)
"""
port = _get_instance_port(port)
# Get program info
program_info = ghidra_instance(port=port)
# Get details for function 1
func1_decompiled = ""
func1_disasm = ""
func1_info = None
func1_vars = []
if func1_address:
func1_decompiled = decompiled_function_by_address(address=func1_address, port=port)
func1_disasm = disassembly_by_address(address=func1_address, port=port)
func1_info = function_info_by_address(address=func1_address, port=port)
vars_result = function_variables_by_address(address=func1_address, port=port)
if isinstance(vars_result, dict):
func1_vars = vars_result.get("variables", [])
elif func1_name:
func1_decompiled = decompiled_function_by_name(name=func1_name, port=port)
func1_disasm = disassembly_by_name(name=func1_name, port=port)
func1_info = function_info_by_name(name=func1_name, port=port)
vars_result = function_variables_by_name(name=func1_name, port=port)
if isinstance(vars_result, dict):
func1_vars = vars_result.get("variables", [])
# Get details for function 2
func2_decompiled = ""
func2_disasm = ""
func2_info = None
func2_vars = []
if func2_address:
func2_decompiled = decompiled_function_by_address(address=func2_address, port=port)
func2_disasm = disassembly_by_address(address=func2_address, port=port)
func2_info = function_info_by_address(address=func2_address, port=port)
vars_result = function_variables_by_address(address=func2_address, port=port)
if isinstance(vars_result, dict):
func2_vars = vars_result.get("variables", [])
elif func2_name:
func2_decompiled = decompiled_function_by_name(name=func2_name, port=port)
func2_disasm = disassembly_by_name(name=func2_name, port=port)
func2_info = function_info_by_name(name=func2_name, port=port)
vars_result = function_variables_by_name(name=func2_name, port=port)
if isinstance(vars_result, dict):
func2_vars = vars_result.get("variables", [])
# Get function identifiers
func1_id = func1_name or func1_address or "Function 1"
func2_id = func2_name or func2_address or "Function 2"
if isinstance(func1_info, dict):
func1_id = func1_info.get("name", func1_id)
if isinstance(func2_info, dict):
func2_id = func2_info.get("name", func2_id)
# Extract basic metrics
func1_lines = len(func1_decompiled.split('\n')) if func1_decompiled else 0
func2_lines = len(func2_decompiled.split('\n')) if func2_decompiled else 0
func1_var_count = len(func1_vars)
func2_var_count = len(func2_vars)
return {
"prompt": f"""
# Function Comparison Analysis
**Binary**: {program_info.get('program_name', 'unknown')}
## Function 1: {func1_id}
**Lines**: {func1_lines} | **Variables**: {func1_var_count}
```c
{func1_decompiled if func1_decompiled else "// Not available"}
```
---
## Function 2: {func2_id}
**Lines**: {func2_lines} | **Variables**: {func2_var_count}
```c
{func2_decompiled if func2_decompiled else "// Not available"}
```
---
## Comparison Framework
### 1. Structural Similarity Analysis
**Control Flow Comparison:**
- Compare number of basic blocks
- Compare branching patterns (if/else, switch, loops)
- Compare nesting depth
- Compare cyclomatic complexity
**Metric Summary:**
| Metric | {func1_id} | {func2_id} | Match |
|--------|------------|------------|-------|
| Line Count | {func1_lines} | {func2_lines} | {'' if abs(func1_lines - func2_lines) < 5 else ''} |
| Variables | {func1_var_count} | {func2_var_count} | {'' if abs(func1_var_count - func2_var_count) < 3 else ''} |
### 2. Semantic Similarity Analysis
**Look for equivalent operations:**
```
Same Semantics, Different Code:
a = b + c ≡ a = c + b
if (x == 0) ≡ if (!x)
i++ ≡ i = i + 1
ptr->field ≡ (*ptr).field
```
**Compiler Optimization Differences:**
- Inlining decisions
- Loop unrolling
- Register allocation
- Constant propagation
### 3. Difference Categories
| Category | Significance | Example |
|----------|--------------|---------|
| **Cosmetic** | Low | Variable names, whitespace |
| **Optimization** | Low | Compiler choices, register use |
| **Refactoring** | Medium | Code reorganization, extraction |
| **Functional** | High | Different algorithms, new features |
| **Security Patch** | Critical | Bounds checks, validation added |
### 4. Library Function Identification
**If functions appear similar to known libraries:**
Check for signatures of:
- CRT functions (memcpy, strlen, malloc)
- Crypto libraries (AES, SHA implementations)
- Compression (zlib, LZ4)
- Common patterns (linked list ops, hash tables)
**FLIRT-style matching:**
- First N bytes pattern
- Constant values (magic numbers)
- Call patterns
### 5. Patch Analysis (If Comparing Versions)
**Security Patches Often Add:**
```c
// Before (vulnerable)
memcpy(dest, src, len);
// After (patched)
if (len > sizeof(dest)) return ERROR; // ← Added bounds check
memcpy(dest, src, len);
```
**Common Patch Patterns:**
- Added length/bounds validation
- Added NULL pointer checks
- Integer overflow protection
- Changed insecure functions (strcpy → strncpy)
### 6. Malware Variant Analysis
**If Comparing Suspected Variants:**
| Indicator | Meaning |
|-----------|---------|
| Same structure, different strings | Configuration change |
| Same structure, different constants | Key/C2 change |
| Added functions | New capability |
| Removed functions | Slimmed variant |
| Heavy obfuscation changes | Anti-detection update |
### 7. Comparison Techniques
**Manual Diff:**
1. Align similar code sections
2. Mark additions in green
3. Mark deletions in red
4. Mark modifications in yellow
**Automated Approaches:**
- BinDiff / Diaphora (Ghidra plugins)
- Instruction-level hashing
- CFG isomorphism
- Semantic similarity scoring
### 8. Reporting Template
```
Comparison: {func1_id} vs {func2_id}
Similarity Score: XX%
Key Differences:
1. [Location] - [Description of change]
2. [Location] - [Description of change]
Classification:
[ ] Same function (cosmetic differences only)
[ ] Optimized/recompiled version
[ ] Refactored version
[ ] Patched version (security fix)
[ ] Different functionality
[ ] Different function entirely
Notes:
[Your analysis here]
```
## Recommended Next Steps
- `analysis_get_callgraph(address="<func1>")` - Compare call patterns
- `xrefs_list(address="<func1>")` - Compare usage contexts
- `structs_get(name="<struct>")` - Compare data structure usage
- Analyze disassembly for instruction-level differences
""",
"context": {
"program_info": program_info,
"function1": {
"name": func1_id,
"info": func1_info,
"lines": func1_lines,
"variables": func1_var_count
},
"function2": {
"name": func2_id,
"info": func2_info,
"lines": func2_lines,
"variables": func2_var_count
}
}
}
@mcp.prompt("document_struct")
def document_struct_prompt(name: str, port: int = None):
"""A prompt to comprehensively document a data structure
Analyzes structure usage across the codebase to determine field purposes.
Args:
name: Structure name to document
port: Specific Ghidra instance port (optional)
"""
port = _get_instance_port(port)
# Get program info
program_info = ghidra_instance(port=port)
# Get the structure definition
struct_info = structs_get(name=name, port=port, page_size=100)
fields = []
struct_size = 0
if isinstance(struct_info, dict):
fields = struct_info.get("fields", struct_info.get("items", []))
struct_size = struct_info.get("size", 0)
# Search for functions that reference this struct
struct_funcs = functions_list(port=port, grep=name, page_size=20)
related_funcs = []
if isinstance(struct_funcs, dict):
related_funcs = struct_funcs.get("functions", struct_funcs.get("items", []))
# Search for strings that might relate to field names
# (often debug strings reference struct field names)
field_names = [f.get('name', '') for f in fields if f.get('name')]
related_strings = []
for field_name in field_names[:5]:
if len(field_name) > 3: # Skip very short names
strings = data_list_strings(port=port, grep=field_name, page_size=3)
if isinstance(strings, dict):
related_strings.extend(strings.get("strings", strings.get("items", []))[:2])
# Format structure fields
fields_display = ""
if fields:
max_type_len = max(len(str(f.get('type', ''))) for f in fields) if fields else 10
max_name_len = max(len(str(f.get('name', ''))) for f in fields) if fields else 10
fields_display = "\n".join([
f" +{f.get('offset', 0):04x} {str(f.get('type', 'unknown')).ljust(max_type_len)} {str(f.get('name', 'field_' + str(i))).ljust(max_name_len)} // {f.get('size', '?')} bytes"
for i, f in enumerate(fields)
])
else:
fields_display = " (No fields found)"
# Format related functions
funcs_display = "\n".join([
f" {f.get('address', f.get('entry_point', 'N/A'))}: {f.get('name', 'unknown')}"
for f in related_funcs[:15]
]) if related_funcs else "No related functions found"
# Format related strings
strings_display = "\n".join([
f" {s.get('address', 'N/A')}: \"{str(s.get('value', s.get('string', s)))[:50]}\""
for s in related_strings[:10]
]) if related_strings else "No related strings found"
return {
"prompt": f"""
# Structure Documentation: {name}
**Binary**: {program_info.get('program_name', 'unknown')}
**Structure Size**: {struct_size} bytes (0x{struct_size:x})
## Field Layout
```
{fields_display}
```
## Functions Referencing This Structure
```
{funcs_display}
```
## Related Strings
```
{strings_display}
```
## Documentation Framework
### 1. Structure Purpose Analysis
**Determine the struct's role:**
- Is it a configuration structure?
- Is it a protocol message/packet?
- Is it an internal state tracker?
- Is it an API/ABI type?
- Is it a file format header?
### 2. Field Documentation Template
For each field, document:
```
┌─────────────────────────────────────────────────────────────┐
│ Field: [name] │
│ Offset: 0x[offset] Size: [bytes] Type: [type] │
├─────────────────────────────────────────────────────────────┤
│ Purpose: [What this field represents] │
│ Valid Values: [Range, enum values, or constraints] │
│ Set By: [Function(s) that write this field] │
│ Used By: [Function(s) that read this field] │
│ Notes: [Special considerations, endianness, etc.] │
└─────────────────────────────────────────────────────────────┘
```
### 3. Common Field Patterns
**Identification Fields:**
- Magic numbers (file/protocol signatures)
- Version fields
- Type/opcode discriminators
- Size/length fields
**Data Fields:**
- Pointers to dynamic data
- Inline arrays/strings
- Numeric values
- Flags/bitfields
**Linkage Fields:**
- Next/prev pointers (linked lists)
- Parent/child pointers (trees)
- Hash table chains
- Reference counts
### 4. Bitfield Analysis
If a field appears to be flags:
```
Field: flags (offset 0x10, 4 bytes)
Bit 0 (0x00000001): INITIALIZED
Bit 1 (0x00000002): CONNECTED
Bit 2 (0x00000004): AUTHENTICATED
Bit 3 (0x00000008): ENCRYPTED
Bits 4-7: Reserved
Bits 8-15: State enum (0-255)
Bits 16-31: Error code
```
### 5. Structure Relationship Mapping
```
┌──────────────┐
{name}
└──────┬───────┘
┌───────────────┼───────────────┐
▼ ▼ ▼
┌────────────┐ ┌────────────┐ ┌────────────┐
│ Related 1 │ │ Related 2 │ │ Related 3 │
└────────────┘ └────────────┘ └────────────┘
```
Document:
- Parent structures (this struct is a field of...)
- Child structures (this struct contains pointers to...)
- Related structures (often used together with...)
### 6. Memory Layout Visualization
```
{name} (0x{struct_size:x} bytes)
┌────────────────────────────────────────┐ 0x0000
│ │
│ [field 1] │
│ │
├────────────────────────────────────────┤ 0x????
│ [field 2] │
├────────────────────────────────────────┤ 0x????
│ [field 3] │
│ │
├────────────────────────────────────────┤ 0x????
│ ... │
└────────────────────────────────────────┘ 0x{struct_size:04x}
```
### 7. Usage Pattern Analysis
**Lifecycle:**
1. **Allocation**: How are instances created?
2. **Initialization**: What sets up initial values?
3. **Usage**: How is it passed around and used?
4. **Cleanup**: How is it destroyed/freed?
**Thread Safety:**
- Is there a mutex/lock field?
- Are accesses atomic?
- Is it passed between threads?
### 8. Documentation Output Format
```markdown
## {name}
**Size**: {struct_size} bytes
**Purpose**: [One-line description]
### Fields
| Offset | Type | Name | Description |
|--------|------|------|-------------|
| 0x0000 | uint32 | magic | File signature (0xDEADBEEF) |
| 0x0004 | uint16 | version | Format version (currently 2) |
| ... | ... | ... | ... |
### Related Functions
- `create_{name}()` - Allocator
- `init_{name}()` - Initializer
- `process_{name}()` - Main handler
- `free_{name}()` - Destructor
### Notes
[Any special considerations, known issues, etc.]
```
## Recommended Next Steps
- `functions_decompile(name="<related_func>")` - See how fields are used
- `xrefs_list(address="<struct_address>")` - Find all references
- `structs_list()` - Find related structures
- For each field: trace reads and writes to understand purpose
""",
"context": {
"program_info": program_info,
"struct_name": name,
"struct_size": struct_size,
"field_count": len(fields),
"fields": fields[:20],
"related_functions": [f.get('name') for f in related_funcs[:10]]
}
}
@mcp.prompt("find_error_handlers")
async def find_error_handlers_prompt(port: int = None, ctx: Context = None):
"""A prompt to map error handling throughout the binary
Identifies exception handlers, error paths, logging, and cleanup routines.
Reports progress during multi-category scanning.
Args:
port: Specific Ghidra instance port (optional)
ctx: FastMCP context for progress reporting (auto-injected)
"""
port = _get_instance_port(port)
# Get program info (total: 11 error + 6 strings + 6 log + 6 cleanup + 5 exit + 1 info = 35)
await report_step(ctx, 1, 35, "Getting program info")
program_info = ghidra_instance(port=port)
# Search for error-related function names (11 patterns)
error_patterns = [
"error", "err", "fail", "exception", "abort", "panic",
"fatal", "die", "exit", "cleanup", "handler"
]
error_funcs = []
for idx, pattern in enumerate(error_patterns, start=2):
await report_step(ctx, idx, 35, f"Scanning error functions: '{pattern}'")
funcs = functions_list(port=port, grep=pattern, grep_ignorecase=True, page_size=10)
if isinstance(funcs, dict):
for f in funcs.get("functions", funcs.get("items", []))[:5]:
f['_pattern'] = pattern
error_funcs.append(f)
# Deduplicate
seen = set()
unique_error = []
for f in error_funcs:
addr = f.get("address", f.get("entry_point", str(f)))
if addr not in seen:
seen.add(addr)
unique_error.append(f)
# Search for error-related strings (6 patterns)
error_strings = []
string_patterns = [
"error", "failed", "invalid", "cannot", "unable",
"exception", "warning", "fatal", "critical"
]
base_step = 2 + len(error_patterns) # 13
for idx, pattern in enumerate(string_patterns[:6], start=base_step):
await report_step(ctx, idx, 35, f"Scanning error strings: '{pattern}'")
strings = data_list_strings(port=port, grep=pattern, grep_ignorecase=True, page_size=8)
if isinstance(strings, dict):
for s in strings.get("strings", strings.get("items", []))[:4]:
s['_pattern'] = pattern
error_strings.append(s)
# Search for logging functions (6 patterns)
log_patterns = ["log", "print", "debug", "trace", "syslog", "fprintf"]
log_funcs = []
base_step = 13 + 6 # 19
for idx, pattern in enumerate(log_patterns, start=base_step):
await report_step(ctx, idx, 35, f"Scanning logging: '{pattern}'")
funcs = functions_list(port=port, grep=pattern, grep_ignorecase=True, page_size=5)
if isinstance(funcs, dict):
log_funcs.extend(funcs.get("functions", funcs.get("items", []))[:3])
# Search for cleanup/destructor patterns (6 patterns)
cleanup_patterns = ["cleanup", "destroy", "free", "release", "close", "deinit"]
cleanup_funcs = []
base_step = 19 + 6 # 25
for idx, pattern in enumerate(cleanup_patterns, start=base_step):
await report_step(ctx, idx, 35, f"Scanning cleanup: '{pattern}'")
funcs = functions_list(port=port, grep=pattern, grep_ignorecase=True, page_size=5)
if isinstance(funcs, dict):
cleanup_funcs.extend(funcs.get("functions", funcs.get("items", []))[:3])
# Search for exit/abort functions (5 patterns)
exit_patterns = ["exit", "abort", "_Exit", "quick_exit", "terminate"]
exit_funcs = []
base_step = 25 + 6 # 31
for idx, pattern in enumerate(exit_patterns, start=base_step):
await report_step(ctx, idx, 35, f"Scanning exit: '{pattern}'")
funcs = functions_list(port=port, grep=f"^{pattern}$|^_{pattern}$", page_size=3)
if isinstance(funcs, dict):
exit_funcs.extend(funcs.get("functions", funcs.get("items", []))[:2])
# Format outputs
error_display = "\n".join([
f" {f.get('address', f.get('entry_point', 'N/A'))}: {f.get('name', 'unknown')} [{f.get('_pattern', '')}]"
for f in unique_error[:20]
]) if unique_error else "No error handling functions found"
strings_display = "\n".join([
f" {s.get('address', 'N/A')}: \"{str(s.get('value', s.get('string', s)))[:50]}\" [{s.get('_pattern', '')}]"
for s in error_strings[:15]
]) if error_strings else "No error strings found"
log_display = "\n".join([
f" {f.get('address', f.get('entry_point', 'N/A'))}: {f.get('name', 'unknown')}"
for f in log_funcs[:10]
]) if log_funcs else "No logging functions found"
cleanup_display = "\n".join([
f" {f.get('address', f.get('entry_point', 'N/A'))}: {f.get('name', 'unknown')}"
for f in cleanup_funcs[:10]
]) if cleanup_funcs else "No cleanup functions found"
exit_display = "\n".join([
f" {f.get('address', f.get('entry_point', 'N/A'))}: {f.get('name', 'unknown')}"
for f in exit_funcs[:5]
]) if exit_funcs else "No exit functions found"
return {
"prompt": f"""
# Error Handling Analysis
**Binary**: {program_info.get('program_name', 'unknown')}
## Error Handling Functions
```
{error_display}
```
## Error Messages
```
{strings_display}
```
## Logging Functions
```
{log_display}
```
## Cleanup/Destructor Functions
```
{cleanup_display}
```
## Exit/Abort Functions
```
{exit_display}
```
## Analysis Framework
### 1. Error Handling Patterns
**Pattern 1: Return Code Checking**
```c
ret = do_something();
if (ret < 0) {{
log_error("do_something failed: %d", ret);
return ret; // Propagate error
}}
```
**Pattern 2: Exception-like (goto cleanup)**
```c
int func() {{
if (!(ptr1 = malloc(...))) goto err1;
if (!(ptr2 = malloc(...))) goto err2;
// ... work ...
return SUCCESS;
err2:
free(ptr1);
err1:
return ERROR;
}}
```
**Pattern 3: C++ Exceptions**
```c
try {{
riskyOperation();
}} catch (const std::exception& e) {{
handleError(e);
}}
```
**Pattern 4: Windows SEH**
```c
__try {{
riskyCode();
}} __except(EXCEPTION_EXECUTE_HANDLER) {{
handleException();
}}
```
### 2. Error Propagation Mapping
```
Function A
┌─────────────────┐
│ Function B │◄── Error originates here
└────────┬────────┘
│ returns ERROR
┌─────────────────┐
│ Function A │◄── Propagates error
└────────┬────────┘
│ returns ERROR
┌─────────────────┐
│ Caller │◄── Handles or propagates
└─────────────────┘
```
### 3. Exception Handler Types
**Structured Exception Handling (Windows):**
- Look for `__try`/`__except`/`__finally`
- Check for `_except_handler` functions
- Examine exception filter expressions
**C++ Exception Handling:**
- `__cxa_throw`, `__cxa_begin_catch`, `__cxa_end_catch`
- `.eh_frame` and `.gcc_except_table` sections
- Personality routines (`__gxx_personality_v0`)
**Signal Handlers (Unix):**
- `signal()`, `sigaction()` setup
- Custom handlers for SIGSEGV, SIGBUS, etc.
### 4. Error Code Analysis
**Document the error code scheme:**
```
Error Code Ranges:
0 = Success
1-99 = General errors
100-199 = File errors
200-299 = Network errors
300-399 = Authentication errors
400-499 = Permission errors
-1 = Generic failure
```
**Common Conventions:**
| Convention | Success | Failure |
|------------|---------|---------|
| Unix style | 0 | -1 or negative |
| Boolean | 1/true | 0/false |
| HRESULT | >= 0 | < 0 |
| errno-based | 0 | errno set |
### 5. Cleanup Path Analysis
**Resource Cleanup Checklist:**
- [ ] All malloc'd memory freed
- [ ] All file handles closed
- [ ] All sockets closed
- [ ] All mutexes released
- [ ] All threads joined
- [ ] All temp files removed
**RAII-style (C++):**
```cpp
// Destructor handles cleanup automatically
unique_ptr<Resource> res = make_unique<Resource>();
```
**Manual cleanup (C):**
```c
// Must explicitly free on every exit path
if (error) {{
free(buffer);
close(fd);
return -1;
}}
```
### 6. Logging Analysis
**Log Levels:**
```
TRACE - Detailed debugging
DEBUG - Development info
INFO - Normal operation
WARNING - Potential issues
ERROR - Failures (recoverable)
FATAL - Unrecoverable (exit)
```
**Useful Information in Logs:**
- Error messages reveal expected conditions
- Debug strings reveal internal state
- Trace messages reveal execution flow
- Format strings reveal data structures
### 7. Security Implications
**Error Handling Vulnerabilities:**
| Issue | Risk | Example |
|-------|------|---------|
| Missing error check | High | Use after failed malloc |
| Error info disclosure | Medium | Stack traces to user |
| Inconsistent cleanup | Medium | Memory leaks, resource exhaustion |
| Error-based oracle | Low | Different errors reveal state |
### 8. Documentation Output
```
Error Handling Map for {program_info.get('program_name', 'unknown')}
Central Error Handlers:
- handle_error() @ 0x401000 - Main error router
- panic() @ 0x402000 - Fatal error handler
Error Propagation:
network_read() → connection_handler() → main_loop()
file_parse() → load_config() → init()
Cleanup Routines:
- cleanup_connection() - Closes sockets, frees buffers
- cleanup_session() - Destroys session state
Exit Codes:
0 - Success
1 - Configuration error
2 - Network error
3 - Authentication failure
```
## Recommended Next Steps
- `xrefs_list(address="<error_string>")` - Find error check locations
- `functions_decompile(name="<error_handler>")` - Analyze error processing
- `analysis_get_callgraph(name="<cleanup_func>")` - Map cleanup flow
- Look for functions with many callees to `exit()` or `abort()`
""",
"context": {
"program_info": program_info,
"error_functions": [f.get('name') for f in unique_error[:15]],
"error_strings": [str(s.get('value', s.get('string', '')))[:40] for s in error_strings[:10]],
"log_functions": [f.get('name') for f in log_funcs[:10]],
"cleanup_functions": [f.get('name') for f in cleanup_funcs[:10]],
"exit_functions": [f.get('name') for f in exit_funcs[:5]]
}
}
# ================= MCP Tools =================
# Since we can't use tool groups, we'll use namespaces in the function names
# Instance management tools
@mcp.tool()
def instances_list() -> dict:
"""List all active Ghidra instances
This is the primary tool for working with instances. It automatically discovers
new instances on the default host before listing.
Use instances_discover(host) only if you need to scan a different host.
Returns:
dict: Contains 'instances' list with all available Ghidra instances
"""
# Auto-discover new instances before listing
_discover_instances(QUICK_DISCOVERY_RANGE, host=None, timeout=0.5)
with instances_lock:
return {
"instances": [
{
"port": port,
"url": info["url"],
"project": info.get("project", ""),
"file": info.get("file", "")
}
for port, info in active_instances.items()
]
}
@mcp.tool()
def instances_discover(host: str = None) -> dict:
"""Discover Ghidra instances on a specific host
Use this ONLY when you need to discover instances on a different host.
For normal usage, just use instances_list() which auto-discovers on the default host.
Args:
host: Host to scan for Ghidra instances (default: configured ghidra_host)
Returns:
dict: Contains 'instances' list with all available instances after discovery
"""
# Discover instances on the specified host
_discover_instances(QUICK_DISCOVERY_RANGE, host=host, timeout=0.5)
# Return all instances (same format as instances_list for consistency)
with instances_lock:
return {
"instances": [
{
"port": port,
"url": info["url"],
"project": info.get("project", ""),
"file": info.get("file", "")
}
for port, info in active_instances.items()
]
}
@mcp.tool()
def instances_register(port: int, url: str = None) -> str:
"""Register a new Ghidra instance
Args:
port: Port number of the Ghidra instance
url: Optional URL if different from default http://host:port
Returns:
str: Confirmation message or error
"""
return register_instance(port, url)
@mcp.tool()
def instances_unregister(port: int) -> str:
"""Unregister a Ghidra instance
Args:
port: Port number of the instance to unregister
Returns:
str: Confirmation message or error
"""
with instances_lock:
if port in active_instances:
del active_instances[port]
return f"Unregistered instance on port {port}"
return f"No instance found on port {port}"
@mcp.tool()
def instances_use(port: int) -> str:
"""Set the current working Ghidra instance
Args:
port: Port number of the instance to use
Returns:
str: Confirmation message or error
"""
global current_instance_port
# First validate that the instance exists and is active
if port not in active_instances:
# Try to register it if not found
register_instance(port)
if port not in active_instances:
return f"Error: No active Ghidra instance found on port {port}"
# Set as current instance
current_instance_port = port
# Return information about the selected instance
with instances_lock:
info = active_instances[port]
program = info.get("file", "unknown program")
project = info.get("project", "unknown project")
return f"Now using Ghidra instance on port {port} with {program} in project {project}"
@mcp.tool()
def instances_current() -> dict:
"""Get information about the current working Ghidra instance
Returns:
dict: Details about the current instance and program
"""
return ghidra_instance(port=current_instance_port)
# ================= Cursor Management Tools =================
# Tools for managing pagination cursors with session isolation
def _get_session_id(ctx: Context = None) -> str:
"""Get session ID from FastMCP context
Uses the session object's id() for reliable session tracking.
The session object persists across tool calls within the same MCP connection.
Security: This function does NOT accept manual session_id overrides
to prevent session spoofing attacks.
"""
if ctx:
# Try to get client_id first (explicitly provided by client)
if hasattr(ctx, 'client_id') and ctx.client_id:
return f"client-{ctx.client_id}"
# Use session object's memory id as unique session identifier
# This persists across tool calls within the same MCP connection
if hasattr(ctx, 'session') and ctx.session:
return f"session-{id(ctx.session)}"
# Fallback to request_id prefix for stdio transport
if hasattr(ctx, 'request_id') and ctx.request_id:
return f"req-{ctx.request_id[:8]}" if len(ctx.request_id) > 8 else f"req-{ctx.request_id}"
return "default"
@mcp.tool()
def cursor_next(cursor_id: str, ctx: Context = None) -> dict:
"""Get the next page of results for a pagination cursor
Args:
cursor_id: The cursor ID from a previous paginated response
ctx: FastMCP context (auto-injected)
Returns:
dict: Next page of results with updated pagination info
"""
if not cursor_id:
return {
"success": False,
"error": {
"code": "MISSING_PARAMETER",
"message": "cursor_id parameter is required"
},
"timestamp": int(time.time() * 1000)
}
sid = _get_session_id(ctx)
state = cursor_manager.advance_cursor(cursor_id, sid)
if not state:
return {
"success": False,
"error": {
"code": "CURSOR_NOT_FOUND",
"message": f"Cursor '{cursor_id}' not found, expired, or belongs to another session"
},
"timestamp": int(time.time() * 1000)
}
current_page = cursor_manager.get_page(state)
response_cursor = cursor_id if state.has_more else None
response = {
"success": True,
"result": current_page,
"pagination": {
"cursor_id": response_cursor,
"session_id": state.session_id,
"tool_name": state.tool_name,
"total_count": state.total_count,
"filtered_count": state.filtered_count,
"page_size": state.page_size,
"current_page": state.current_page,
"total_pages": state.total_pages,
"has_more": state.has_more,
"grep_pattern": state.grep_pattern,
"items_returned": len(current_page),
"ttl_remaining": state.ttl_remaining,
},
"timestamp": int(time.time() * 1000)
}
# Add prominent message for LLMs
if state.has_more:
remaining = state.filtered_count - (state.current_page * state.page_size)
response["_message"] = (
f"📄 Page {state.current_page}/{state.total_pages}: "
f"{len(current_page)} items. {remaining} more available. "
f"Continue with: cursor_next(cursor_id='{cursor_id}')"
)
else:
total_fetched = state.current_page * state.page_size
response["_message"] = (
f"✅ Final page {state.current_page}/{state.total_pages}: "
f"{len(current_page)} items. All {state.filtered_count} items retrieved."
)
return response
@mcp.tool()
def cursor_list(ctx: Context = None, all_sessions: bool = False) -> dict:
"""List active pagination cursors
Args:
ctx: FastMCP context (auto-injected)
all_sessions: If True, list cursors from all sessions (admin use)
Returns:
dict: List of active cursors with their metadata
"""
sid = None if all_sessions else _get_session_id(ctx)
cursors = cursor_manager.list_cursors(session_id=sid)
return {
"success": True,
"result": cursors,
"stats": cursor_manager.get_stats(),
"timestamp": int(time.time() * 1000)
}
@mcp.tool()
def cursor_delete(cursor_id: str, ctx: Context = None) -> dict:
"""Delete a pagination cursor to free resources
Args:
cursor_id: The cursor ID to delete
ctx: FastMCP context (auto-injected)
Returns:
dict: Operation result
"""
if not cursor_id:
return {
"success": False,
"error": {
"code": "MISSING_PARAMETER",
"message": "cursor_id parameter is required"
},
"timestamp": int(time.time() * 1000)
}
sid = _get_session_id(ctx)
deleted = cursor_manager.delete_cursor(cursor_id, sid)
if deleted:
return {
"success": True,
"result": {
"deleted": True,
"cursor_id": cursor_id,
"message": "Cursor deleted successfully"
},
"timestamp": int(time.time() * 1000)
}
else:
return {
"success": False,
"error": {
"code": "CURSOR_NOT_FOUND",
"message": f"Cursor '{cursor_id}' not found or belongs to another session"
},
"timestamp": int(time.time() * 1000)
}
@mcp.tool()
def cursor_delete_all(ctx: Context = None) -> dict:
"""Delete all pagination cursors for the current session
Args:
ctx: FastMCP context (auto-injected)
Returns:
dict: Number of cursors deleted
"""
sid = _get_session_id(ctx)
count = cursor_manager.delete_session_cursors(sid)
return {
"success": True,
"result": {
"deleted_count": count,
"session_id": sid,
"message": f"Deleted {count} cursor(s) for session '{sid}'"
},
"timestamp": int(time.time() * 1000)
}
# ================= End Cursor Management Tools =================
# Function tools
@mcp.tool()
def functions_list(
name_contains: str = None,
name_matches_regex: str = None,
port: int = None,
# Pagination parameters
page_size: int = DEFAULT_PAGE_SIZE,
grep: str = None,
grep_ignorecase: bool = True,
return_all: bool = False,
ctx: Context = None
) -> dict:
"""List functions with cursor-based pagination and grep filtering
Args:
name_contains: Substring name filter (case-insensitive, server-side)
name_matches_regex: Regex name filter (server-side)
port: Specific Ghidra instance port (optional)
page_size: Items per page (default: 50, max: 500)
grep: Regex pattern to filter results client-side (e.g., "main|init", "FUN_.*")
grep_ignorecase: Case-insensitive grep (default: True)
return_all: Bypass pagination and return all results (use with caution)
ctx: FastMCP context (auto-injected)
Returns:
dict: List of functions with pagination info. Use cursor_next(cursor_id) for more.
Examples:
# Get first page of all functions
functions_list()
# Filter to functions containing "main"
functions_list(name_contains="main")
# Client-side grep for FUN_* named functions
functions_list(grep="^FUN_")
# Get all functions (bypasses pagination - use carefully!)
functions_list(return_all=True)
"""
port_to_use = _get_instance_port(port)
sid = _get_session_id(ctx)
# Fetch a larger batch from Ghidra to enable client-side pagination
# We request more than page_size to allow grep filtering
fetch_limit = 5000 if return_all else max(page_size * 10, 500)
params = {
"offset": 0,
"limit": fetch_limit
}
if name_contains:
params["name_contains"] = name_contains
if name_matches_regex:
params["name_matches_regex"] = name_matches_regex
response = safe_get(port_to_use, "functions", params)
simplified = simplify_response(response)
# Handle error responses
if not isinstance(simplified, dict) or not simplified.get("success", False):
return simplified
# Extract the result list
result_data = simplified.get("result", [])
if not isinstance(result_data, list):
return simplified
# Build query params for cursor hashing
query_params = {
"tool": "functions_list",
"port": port_to_use,
"name_contains": name_contains,
"name_matches_regex": name_matches_regex,
"grep": grep
}
# Use the paginate_response helper
return paginate_response(
data=result_data,
query_params=query_params,
tool_name="functions_list",
session_id=sid,
page_size=page_size,
grep=grep,
grep_ignorecase=grep_ignorecase,
return_all=return_all
)
@mcp.tool()
def functions_get(name: str = None, address: str = None, port: int = None) -> dict:
"""Get detailed information about a function
Args:
name: Function name (mutually exclusive with address)
address: Function address in hex format (mutually exclusive with name)
port: Specific Ghidra instance port (optional)
Returns:
dict: Detailed function information
"""
if not name and not address:
return {
"success": False,
"error": {
"code": "MISSING_PARAMETER",
"message": "Either name or address parameter is required"
},
"timestamp": int(time.time() * 1000)
}
port = _get_instance_port(port)
if address:
endpoint = f"functions/{address}"
else:
endpoint = f"functions/by-name/{quote(name)}"
response = safe_get(port, endpoint)
return simplify_response(response)
@mcp.tool()
def functions_decompile(
name: str = None,
address: str = None,
syntax_tree: bool = False,
style: str = "normalize",
port: int = None,
# Pagination parameters (line-based)
page_size: int = 50,
grep: str = None,
grep_ignorecase: bool = True,
return_all: bool = False,
ctx: Context = None
) -> dict:
"""Get decompiled code for a function with cursor-based line pagination
Args:
name: Function name (mutually exclusive with address)
address: Function address in hex format (mutually exclusive with name)
syntax_tree: Include syntax tree (default: False)
style: Decompiler style (default: "normalize")
port: Specific Ghidra instance port (optional)
page_size: Lines per page (default: 50, max: 500)
grep: Regex pattern to filter lines (e.g., "if.*==", "malloc|free")
grep_ignorecase: Case-insensitive grep (default: True)
return_all: Return all lines without pagination (use with caution for large functions)
ctx: FastMCP context (auto-injected)
Returns:
dict: Decompiled code with pagination. Use cursor_next(cursor_id) for more lines.
Examples:
# Get first 50 lines (default)
functions_decompile(name="main")
# Search for specific patterns
functions_decompile(name="main", grep="if.*NULL")
# Get all lines (for small functions)
functions_decompile(name="small_func", return_all=True)
"""
if not name and not address:
return {
"success": False,
"error": {
"code": "MISSING_PARAMETER",
"message": "Either name or address parameter is required"
},
"timestamp": int(time.time() * 1000)
}
port_to_use = _get_instance_port(port)
params = {
"syntax_tree": str(syntax_tree).lower(),
"style": style
}
if address:
endpoint = f"functions/{address}/decompile"
func_id = address
else:
endpoint = f"functions/by-name/{quote(name)}/decompile"
func_id = name
response = safe_get(port_to_use, endpoint, params)
simplified = simplify_response(response)
if not simplified.get("success", False):
return simplified
# Extract the decompiled code and split into lines
result = simplified.get("result", {})
code = result.get("code", "") if isinstance(result, dict) else ""
if not code:
return simplified # Return as-is if no code
# Split code into lines, preserving line numbers
lines = code.split('\n')
# Create line objects with line numbers for better grep matching
line_objects = [{"line_num": i + 1, "code": line} for i, line in enumerate(lines)]
# Build query params for cursor hashing
query_params = {
"tool": "functions_decompile",
"port": port_to_use,
"name": name,
"address": address,
"style": style,
"grep": grep
}
sid = _get_session_id(ctx)
# Use pagination system
paginated = paginate_response(
data=line_objects,
query_params=query_params,
tool_name="functions_decompile",
session_id=sid,
page_size=min(page_size, MAX_PAGE_SIZE),
grep=grep,
grep_ignorecase=grep_ignorecase,
return_all=return_all
)
# Transform result back to code format with line numbers
if paginated.get("success"):
page_lines = paginated.get("result", [])
# Format as "line_num: code" for clarity
formatted_lines = [f"{item['line_num']:4d}: {item['code']}" for item in page_lines]
paginated["result"] = {
"function": func_id,
"code_lines": formatted_lines,
"raw_lines": [item['code'] for item in page_lines]
}
# Add function metadata if available
if isinstance(result, dict):
for key in ["name", "address", "signature", "return_type"]:
if key in result:
paginated["result"][key] = result[key]
return paginated
@mcp.tool()
def functions_disassemble(
name: str = None,
address: str = None,
port: int = None,
# Pagination parameters (instruction-based)
page_size: int = 50,
grep: str = None,
grep_ignorecase: bool = True,
return_all: bool = False,
ctx: Context = None
) -> dict:
"""Get disassembly for a function with cursor-based instruction pagination
Args:
name: Function name (mutually exclusive with address)
address: Function address in hex format (mutually exclusive with name)
port: Specific Ghidra instance port (optional)
page_size: Instructions per page (default: 50, max: 500)
grep: Regex pattern to filter instructions (e.g., "CALL", "JMP|JNZ", "MOV.*EAX")
grep_ignorecase: Case-insensitive grep (default: True)
return_all: Return all instructions without pagination
ctx: FastMCP context (auto-injected)
Returns:
dict: Disassembly with pagination. Use cursor_next(cursor_id) for more instructions.
Examples:
# Get first 50 instructions
functions_disassemble(name="main")
# Find all CALL instructions
functions_disassemble(name="main", grep="CALL")
# Find jumps and conditional jumps
functions_disassemble(name="main", grep="^J")
"""
if not name and not address:
return {
"success": False,
"error": {
"code": "MISSING_PARAMETER",
"message": "Either name or address parameter is required"
},
"timestamp": int(time.time() * 1000)
}
port_to_use = _get_instance_port(port)
if address:
endpoint = f"functions/{address}/disassembly"
func_id = address
else:
endpoint = f"functions/by-name/{quote(name)}/disassembly"
func_id = name
response = safe_get(port_to_use, endpoint)
simplified = simplify_response(response)
if not simplified.get("success", False):
return simplified
# Extract the disassembly - could be text or structured
result = simplified.get("result", {})
# Handle different response formats
if isinstance(result, dict):
disasm_text = result.get("disassembly", "") or result.get("text", "")
instructions = result.get("instructions", [])
elif isinstance(result, str):
disasm_text = result
instructions = []
else:
disasm_text = ""
instructions = []
# If we have structured instructions, use them; otherwise parse text
if instructions:
# Already have instruction objects
line_objects = instructions
elif disasm_text:
# Split text into lines
lines = disasm_text.strip().split('\n')
line_objects = [{"addr": f"line_{i+1}", "instruction": line} for i, line in enumerate(lines) if line.strip()]
else:
return simplified # Return as-is if no disassembly
# Build query params for cursor hashing
query_params = {
"tool": "functions_disassemble",
"port": port_to_use,
"name": name,
"address": address,
"grep": grep
}
sid = _get_session_id(ctx)
# Use pagination system
paginated = paginate_response(
data=line_objects,
query_params=query_params,
tool_name="functions_disassemble",
session_id=sid,
page_size=min(page_size, MAX_PAGE_SIZE),
grep=grep,
grep_ignorecase=grep_ignorecase,
return_all=return_all
)
# Add function context to result
if paginated.get("success"):
page_instructions = paginated.get("result", [])
paginated["result"] = {
"function": func_id,
"instructions": page_instructions
}
# Add function metadata if available
if isinstance(result, dict):
for key in ["name", "address", "entry_point", "size"]:
if key in result:
paginated["result"][key] = result[key]
return paginated
@mcp.tool()
def functions_create(address: str, port: int = None) -> dict:
"""Create a new function at the specified address
Args:
address: Memory address in hex format where function starts
port: Specific Ghidra instance port (optional)
Returns:
dict: Operation result with the created function information
"""
if not address:
return {
"success": False,
"error": {
"code": "MISSING_PARAMETER",
"message": "Address parameter is required"
},
"timestamp": int(time.time() * 1000)
}
port = _get_instance_port(port)
payload = {
"address": address
}
response = safe_post(port, "functions", payload)
return simplify_response(response)
@mcp.tool()
def functions_rename(old_name: str = None, address: str = None, new_name: str = "", port: int = None) -> dict:
"""Rename a function
Args:
old_name: Current function name (mutually exclusive with address)
address: Function address in hex format (mutually exclusive with name)
new_name: New function name
port: Specific Ghidra instance port (optional)
Returns:
dict: Operation result with the updated function information
"""
if not (old_name or address) or not new_name:
return {
"success": False,
"error": {
"code": "MISSING_PARAMETER",
"message": "Either old_name or address, and new_name parameters are required"
},
"timestamp": int(time.time() * 1000)
}
port = _get_instance_port(port)
payload = {
"name": new_name
}
if address:
endpoint = f"functions/{address}"
else:
endpoint = f"functions/by-name/{quote(old_name)}"
response = safe_patch(port, endpoint, payload)
return simplify_response(response)
@mcp.tool()
def functions_set_signature(name: str = None, address: str = None, signature: str = "", port: int = None) -> dict:
"""Set function signature/prototype
Args:
name: Function name (mutually exclusive with address)
address: Function address in hex format (mutually exclusive with name)
signature: New function signature (e.g., "int func(char *data, int size)")
port: Specific Ghidra instance port (optional)
Returns:
dict: Operation result with the updated function information
"""
if not (name or address) or not signature:
return {
"success": False,
"error": {
"code": "MISSING_PARAMETER",
"message": "Either name or address, and signature parameters are required"
},
"timestamp": int(time.time() * 1000)
}
port = _get_instance_port(port)
payload = {
"signature": signature
}
if address:
endpoint = f"functions/{address}"
else:
endpoint = f"functions/by-name/{quote(name)}"
response = safe_patch(port, endpoint, payload)
return simplify_response(response)
@mcp.tool()
def functions_get_variables(
name: str = None,
address: str = None,
port: int = None,
# Pagination parameters
page_size: int = DEFAULT_PAGE_SIZE,
grep: str = None,
grep_ignorecase: bool = True,
return_all: bool = False,
ctx: Context = None
) -> dict:
"""Get variables for a function with cursor-based pagination
Args:
name: Function name (mutually exclusive with address)
address: Function address in hex format (mutually exclusive with name)
port: Specific Ghidra instance port (optional)
page_size: Variables per page (default: 50, max: 500)
grep: Regex pattern to filter variables (e.g., "local_", "param", "ptr.*int")
grep_ignorecase: Case-insensitive grep (default: True)
return_all: Return all variables without pagination
ctx: FastMCP context (auto-injected)
Returns:
dict: Variables with pagination. Use cursor_next(cursor_id) for more.
Examples:
# Get all local variables
functions_get_variables(name="main", grep="local_")
# Find pointer variables
functions_get_variables(name="main", grep="ptr|\\*")
"""
if not name and not address:
return {
"success": False,
"error": {
"code": "MISSING_PARAMETER",
"message": "Either name or address parameter is required"
},
"timestamp": int(time.time() * 1000)
}
port_to_use = _get_instance_port(port)
if address:
endpoint = f"functions/{address}/variables"
func_id = address
else:
endpoint = f"functions/by-name/{quote(name)}/variables"
func_id = name
response = safe_get(port_to_use, endpoint)
simplified = simplify_response(response)
if not simplified.get("success", False):
return simplified
# Extract variables list
result = simplified.get("result", {})
variables = result.get("variables", []) if isinstance(result, dict) else []
if not variables:
return simplified # Return as-is if no variables
# Build query params for cursor hashing
query_params = {
"tool": "functions_get_variables",
"port": port_to_use,
"name": name,
"address": address,
"grep": grep
}
sid = _get_session_id(ctx)
# Use pagination system
paginated = paginate_response(
data=variables,
query_params=query_params,
tool_name="functions_get_variables",
session_id=sid,
page_size=min(page_size, MAX_PAGE_SIZE),
grep=grep,
grep_ignorecase=grep_ignorecase,
return_all=return_all
)
# Add function context
if paginated.get("success"):
paginated["result"] = {
"function": func_id,
"variables": paginated.get("result", [])
}
# Preserve other metadata
if isinstance(result, dict):
for key in ["name", "address", "parameter_count", "local_count"]:
if key in result:
paginated["result"][key] = result[key]
return paginated
# Memory tools
@mcp.tool()
def memory_read(address: str, length: int = 16, format: str = "hex", port: int = None) -> dict:
"""Read bytes from memory
Args:
address: Memory address in hex format
length: Number of bytes to read (default: 16)
format: Output format - "hex", "base64", or "string" (default: "hex")
port: Specific Ghidra instance port (optional)
Returns:
dict: {
"address": original address,
"length": bytes read,
"format": output format,
"hexBytes": the memory contents as hex string,
"rawBytes": the memory contents as base64 string,
"timestamp": response timestamp
}
"""
if not address:
return {
"success": False,
"error": {
"code": "MISSING_PARAMETER",
"message": "Address parameter is required"
},
"timestamp": int(time.time() * 1000)
}
port = _get_instance_port(port)
# Use query parameters instead of path parameters for more reliable handling
params = {
"address": address,
"length": length,
"format": format
}
response = safe_get(port, "memory", params)
simplified = simplify_response(response)
# Ensure the result is simple and directly usable
if "result" in simplified and isinstance(simplified["result"], dict):
result = simplified["result"]
# Pass through all representations of the bytes
memory_info = {
"success": True,
"address": result.get("address", address),
"length": result.get("bytesRead", length),
"format": format,
"timestamp": simplified.get("timestamp", int(time.time() * 1000))
}
# Include all the different byte representations
if "hexBytes" in result:
memory_info["hexBytes"] = result["hexBytes"]
if "rawBytes" in result:
memory_info["rawBytes"] = result["rawBytes"]
return memory_info
return simplified
@mcp.tool()
def memory_write(address: str, bytes_data: str, format: str = "hex", port: int = None) -> dict:
"""Write bytes to memory (use with caution)
Args:
address: Memory address in hex format
bytes_data: Data to write (format depends on 'format' parameter)
format: Input format - "hex", "base64", or "string" (default: "hex")
port: Specific Ghidra instance port (optional)
Returns:
dict: Operation result with success status
"""
if not address:
return {
"success": False,
"error": {
"code": "MISSING_PARAMETER",
"message": "Address parameter is required"
},
"timestamp": int(time.time() * 1000)
}
if not bytes_data:
return {
"success": False,
"error": {
"code": "MISSING_PARAMETER",
"message": "Bytes parameter is required"
},
"timestamp": int(time.time() * 1000)
}
port = _get_instance_port(port)
payload = {
"bytes": bytes_data,
"format": format
}
# Memory write is handled by ProgramEndpoints, not MemoryEndpoints
response = safe_patch(port, f"programs/current/memory/{address}", payload)
return simplify_response(response)
# Xrefs tools
@mcp.tool()
def xrefs_list(
to_addr: str = None,
from_addr: str = None,
type: str = None,
port: int = None,
# Pagination parameters
page_size: int = DEFAULT_PAGE_SIZE,
grep: str = None,
grep_ignorecase: bool = True,
return_all: bool = False,
ctx: Context = None
) -> dict:
"""List cross-references with filtering and cursor-based pagination
Args:
to_addr: Filter references to this address (hexadecimal)
from_addr: Filter references from this address (hexadecimal)
type: Filter by reference type (e.g. "CALL", "READ", "WRITE")
port: Specific Ghidra instance port (optional)
page_size: Items per page (default: 50, max: 500)
grep: Regex pattern to filter results
grep_ignorecase: Case-insensitive grep (default: True)
return_all: Return all results without pagination (use with caution)
ctx: FastMCP context (auto-injected)
Returns:
dict: Cross-references with pagination metadata and cursor for more results
"""
# At least one of the address parameters must be provided
if not to_addr and not from_addr:
return {
"success": False,
"error": {
"code": "MISSING_PARAMETER",
"message": "Either to_addr or from_addr parameter is required"
},
"timestamp": int(time.time() * 1000)
}
port_to_use = _get_instance_port(port)
# Fetch large batch for client-side pagination
params = {
"offset": 0,
"limit": 10000 # Fetch up to 10K for cursor pagination
}
if to_addr:
params["to_addr"] = to_addr
if from_addr:
params["from_addr"] = from_addr
if type:
params["type"] = type
response = safe_get(port_to_use, "xrefs", params)
simplified = simplify_response(response)
if not simplified.get("success", False):
return simplified
all_xrefs = simplified.get("result", [])
# Build query params for cursor hashing
query_params = {
"tool": "xrefs_list",
"port": port_to_use,
"to_addr": to_addr,
"from_addr": from_addr,
"type": type,
"grep": grep
}
sid = _get_session_id(ctx)
return paginate_response(
data=all_xrefs,
query_params=query_params,
tool_name="xrefs_list",
session_id=sid,
page_size=page_size,
grep=grep,
grep_ignorecase=grep_ignorecase,
return_all=return_all
)
# Data tools
@mcp.tool()
def data_list(
addr: str = None,
name: str = None,
name_contains: str = None,
type: str = None,
port: int = None,
# Pagination parameters
page_size: int = DEFAULT_PAGE_SIZE,
grep: str = None,
grep_ignorecase: bool = True,
return_all: bool = False,
ctx: Context = None
) -> dict:
"""List defined data items with filtering and cursor-based pagination
Args:
addr: Filter by address (hexadecimal)
name: Exact name match filter (case-sensitive)
name_contains: Substring name filter (case-insensitive)
type: Filter by data type (e.g. "string", "dword")
port: Specific Ghidra instance port (optional)
page_size: Items per page (default: 50, max: 500)
grep: Regex pattern to filter results
grep_ignorecase: Case-insensitive grep (default: True)
return_all: Return all results without pagination (use with caution)
ctx: FastMCP context (auto-injected)
Returns:
dict: Data items with pagination metadata and cursor for more results
"""
port_to_use = _get_instance_port(port)
# Fetch large batch for client-side pagination
params = {
"offset": 0,
"limit": 10000 # Fetch up to 10K for cursor pagination
}
if addr:
params["addr"] = addr
if name:
params["name"] = name
if name_contains:
params["name_contains"] = name_contains
if type:
params["type"] = type
response = safe_get(port_to_use, "data", params)
simplified = simplify_response(response)
if not simplified.get("success", False):
return simplified
all_data = simplified.get("result", [])
# Build query params for cursor hashing
query_params = {
"tool": "data_list",
"port": port_to_use,
"addr": addr,
"name": name,
"name_contains": name_contains,
"type": type,
"grep": grep
}
sid = _get_session_id(ctx)
return paginate_response(
data=all_data,
query_params=query_params,
tool_name="data_list",
session_id=sid,
page_size=page_size,
grep=grep,
grep_ignorecase=grep_ignorecase,
return_all=return_all
)
@mcp.tool()
def data_create(address: str, data_type: str, size: int = None, port: int = None) -> dict:
"""Define a new data item at the specified address
Args:
address: Memory address in hex format
data_type: Data type (e.g. "string", "dword", "byte")
size: Optional size in bytes for the data item
port: Specific Ghidra instance port (optional)
Returns:
dict: Operation result with the created data information
"""
if not address or not data_type:
return {
"success": False,
"error": {
"code": "MISSING_PARAMETER",
"message": "Address and data_type parameters are required"
},
"timestamp": int(time.time() * 1000)
}
port = _get_instance_port(port)
payload = {
"address": address,
"type": data_type
}
if size is not None:
payload["size"] = size
response = safe_post(port, "data", payload)
return simplify_response(response)
@mcp.tool()
def data_list_strings(
filter: str = None,
port: int = None,
# Pagination parameters
page_size: int = DEFAULT_PAGE_SIZE,
grep: str = None,
grep_ignorecase: bool = True,
return_all: bool = False,
ctx: Context = None
) -> dict:
"""List all defined strings in the binary with cursor-based pagination and grep filtering
Args:
filter: Server-side string content filter
port: Specific Ghidra instance port (optional)
page_size: Items per page (default: 50, max: 500)
grep: Regex pattern to filter results client-side (e.g., "password|key", "http://")
grep_ignorecase: Case-insensitive grep (default: True)
return_all: Bypass pagination and return all strings (use with caution)
ctx: FastMCP context (auto-injected)
Returns:
dict: List of string data with pagination info. Use cursor_next(cursor_id) for more.
Examples:
# Get first page of strings
data_list_strings()
# Filter to strings containing "error"
data_list_strings(filter="error")
# Client-side grep for URLs
data_list_strings(grep="https?://")
# Get all strings (bypasses pagination)
data_list_strings(return_all=True)
"""
port_to_use = _get_instance_port(port)
sid = _get_session_id(ctx)
# Fetch larger batch for client-side pagination
fetch_limit = 10000 if return_all else max(page_size * 10, 2000)
params = {
"offset": 0,
"limit": fetch_limit
}
if filter:
params["filter"] = filter
response = safe_get(port_to_use, "strings", params)
simplified = simplify_response(response)
# Handle error responses
if not isinstance(simplified, dict) or not simplified.get("success", False):
return simplified
# Extract the result list
result_data = simplified.get("result", [])
if not isinstance(result_data, list):
return simplified
# Build query params for cursor hashing
query_params = {
"tool": "data_list_strings",
"port": port_to_use,
"filter": filter,
"grep": grep
}
# Use the paginate_response helper
return paginate_response(
data=result_data,
query_params=query_params,
tool_name="data_list_strings",
session_id=sid,
page_size=page_size,
grep=grep,
grep_ignorecase=grep_ignorecase,
return_all=return_all
)
@mcp.tool()
def data_rename(address: str, name: str, port: int = None) -> dict:
"""Rename a data item
Args:
address: Memory address in hex format
name: New name for the data item
port: Specific Ghidra instance port (optional)
Returns:
dict: Operation result with the updated data information
"""
if not address or not name:
return {
"success": False,
"error": {
"code": "MISSING_PARAMETER",
"message": "Address and name parameters are required"
},
"timestamp": int(time.time() * 1000)
}
port = _get_instance_port(port)
payload = {
"address": address,
"newName": name
}
response = safe_post(port, "data", payload)
return simplify_response(response)
@mcp.tool()
def data_delete(address: str, port: int = None) -> dict:
"""Delete data at the specified address
Args:
address: Memory address in hex format
port: Specific Ghidra instance port (optional)
Returns:
dict: Operation result
"""
if not address:
return {
"success": False,
"error": {
"code": "MISSING_PARAMETER",
"message": "Address parameter is required"
},
"timestamp": int(time.time() * 1000)
}
port = _get_instance_port(port)
payload = {
"address": address,
"action": "delete"
}
response = safe_post(port, "data/delete", payload)
return simplify_response(response)
@mcp.tool()
def data_set_type(address: str, data_type: str, port: int = None) -> dict:
"""Set the data type of a data item
Args:
address: Memory address in hex format
data_type: Data type name (e.g. "uint32_t", "char[10]")
port: Specific Ghidra instance port (optional)
Returns:
dict: Operation result with the updated data information
"""
if not address or not data_type:
return {
"success": False,
"error": {
"code": "MISSING_PARAMETER",
"message": "Address and data_type parameters are required"
},
"timestamp": int(time.time() * 1000)
}
port = _get_instance_port(port)
payload = {
"address": address,
"type": data_type
}
response = safe_post(port, "data/type", payload)
return simplify_response(response)
# Struct tools
@mcp.tool()
def structs_list(
category: str = None,
port: int = None,
# Pagination parameters
page_size: int = DEFAULT_PAGE_SIZE,
grep: str = None,
grep_ignorecase: bool = True,
return_all: bool = False,
ctx: Context = None
) -> dict:
"""List all struct data types in the program with cursor-based pagination
Args:
category: Filter by category path (e.g. "/winapi")
port: Specific Ghidra instance port (optional)
page_size: Items per page (default: 50, max: 500)
grep: Regex pattern to filter results (searches struct names)
grep_ignorecase: Case-insensitive grep (default: True)
return_all: Return all results without pagination (use with caution)
ctx: FastMCP context (auto-injected)
Returns:
dict: Structs with pagination metadata and cursor for more results
"""
port_to_use = _get_instance_port(port)
# Fetch large batch for client-side pagination
params = {
"offset": 0,
"limit": 10000 # Fetch up to 10K for cursor pagination
}
if category:
params["category"] = category
response = safe_get(port_to_use, "structs", params)
simplified = simplify_response(response)
if not simplified.get("success", False):
return simplified
all_structs = simplified.get("result", [])
# Build query params for cursor hashing
query_params = {
"tool": "structs_list",
"port": port_to_use,
"category": category,
"grep": grep
}
sid = _get_session_id(ctx)
return paginate_response(
data=all_structs,
query_params=query_params,
tool_name="structs_list",
session_id=sid,
page_size=page_size,
grep=grep,
grep_ignorecase=grep_ignorecase,
return_all=return_all
)
@mcp.tool()
def structs_get(
name: str,
port: int = None,
# Pagination parameters (field-based)
page_size: int = DEFAULT_PAGE_SIZE,
grep: str = None,
grep_ignorecase: bool = True,
return_all: bool = False,
ctx: Context = None
) -> dict:
"""Get detailed information about a specific struct including all fields
Supports pagination for structs with many fields (e.g., large C++ classes).
Args:
name: Struct name
port: Specific Ghidra instance port (optional)
page_size: Number of fields per page (default: 50, max: 500)
grep: Regex pattern to filter fields (matches field name, type, or comment)
grep_ignorecase: Case-insensitive grep matching (default: True)
return_all: Return all fields without pagination (WARNING: large structs may have 100+ fields)
ctx: FastMCP context (auto-injected)
Returns:
dict: Struct details with paginated fields list
"""
if not name:
return {
"success": False,
"error": {
"code": "MISSING_PARAMETER",
"message": "Struct name parameter is required"
},
"timestamp": int(time.time() * 1000)
}
port = _get_instance_port(port)
sid = _get_session_id(ctx)
params = {"name": name}
response = safe_get(port, "structs", params)
simplified = simplify_response(response)
# Extract struct info and fields for pagination
if not simplified.get("success", True):
return simplified
result = simplified.get("result", simplified)
# Get struct metadata (preserve everything except fields for pagination)
struct_info = {}
fields = []
if isinstance(result, dict):
for key, value in result.items():
if key == "fields" and isinstance(value, list):
fields = value
else:
struct_info[key] = value
# If no fields or very few, return as-is
if len(fields) <= 10 and not grep:
return simplified
# Build query params for cursor hashing
query_params = {
"tool": "structs_get",
"port": port,
"name": name
}
# Paginate fields
paginated = paginate_response(
data=fields,
query_params=query_params,
tool_name="structs_get",
session_id=sid,
page_size=page_size,
grep=grep,
grep_ignorecase=grep_ignorecase,
return_all=return_all
)
# Merge struct metadata with paginated fields
if paginated.get("success"):
paginated["struct_name"] = struct_info.get("name", name)
paginated["struct_size"] = struct_info.get("size", struct_info.get("length"))
paginated["struct_category"] = struct_info.get("category", struct_info.get("categoryPath"))
paginated["struct_description"] = struct_info.get("description")
# The paginated "result" contains the fields
paginated["fields"] = paginated.pop("result", [])
# Update message to be struct-specific
if "_message" in paginated:
paginated["_message"] = paginated["_message"].replace("items", "fields")
return paginated
@mcp.tool()
def structs_create(name: str, category: str = None, description: str = None, port: int = None) -> dict:
"""Create a new struct data type
Args:
name: Name for the new struct
category: Category path for the struct (e.g. "/custom")
description: Optional description for the struct
port: Specific Ghidra instance port (optional)
Returns:
dict: Created struct information
"""
if not name:
return {
"success": False,
"error": {
"code": "MISSING_PARAMETER",
"message": "Struct name parameter is required"
},
"timestamp": int(time.time() * 1000)
}
port = _get_instance_port(port)
payload = {"name": name}
if category:
payload["category"] = category
if description:
payload["description"] = description
response = safe_post(port, "structs/create", payload)
return simplify_response(response)
@mcp.tool()
def structs_add_field(struct_name: str, field_name: str, field_type: str,
offset: int = None, comment: str = None, port: int = None) -> dict:
"""Add a field to an existing struct
Args:
struct_name: Name of the struct to modify
field_name: Name for the new field
field_type: Data type for the field (e.g. "int", "char", "pointer")
offset: Specific offset to insert field (optional, appends to end if not specified)
comment: Optional comment for the field
port: Specific Ghidra instance port (optional)
Returns:
dict: Operation result with updated struct size and field information
"""
if not struct_name or not field_name or not field_type:
return {
"success": False,
"error": {
"code": "MISSING_PARAMETER",
"message": "struct_name, field_name, and field_type parameters are required"
},
"timestamp": int(time.time() * 1000)
}
port = _get_instance_port(port)
payload = {
"struct": struct_name,
"fieldName": field_name,
"fieldType": field_type
}
if offset is not None:
payload["offset"] = offset
if comment:
payload["comment"] = comment
response = safe_post(port, "structs/addfield", payload)
return simplify_response(response)
@mcp.tool()
def structs_update_field(struct_name: str, field_name: str = None, field_offset: int = None,
new_name: str = None, new_type: str = None, new_comment: str = None,
port: int = None) -> dict:
"""Update an existing field in a struct (change name, type, or comment)
Args:
struct_name: Name of the struct to modify
field_name: Name of the field to update (use this OR field_offset)
field_offset: Offset of the field to update (use this OR field_name)
new_name: New name for the field (optional)
new_type: New data type for the field (optional, e.g. "int", "pointer")
new_comment: New comment for the field (optional)
port: Specific Ghidra instance port (optional)
Returns:
dict: Operation result with old and new field values
"""
if not struct_name:
return {
"success": False,
"error": {
"code": "MISSING_PARAMETER",
"message": "struct_name parameter is required"
},
"timestamp": int(time.time() * 1000)
}
if not field_name and field_offset is None:
return {
"success": False,
"error": {
"code": "MISSING_PARAMETER",
"message": "Either field_name or field_offset must be provided"
},
"timestamp": int(time.time() * 1000)
}
if not new_name and not new_type and new_comment is None:
return {
"success": False,
"error": {
"code": "MISSING_PARAMETER",
"message": "At least one of new_name, new_type, or new_comment must be provided"
},
"timestamp": int(time.time() * 1000)
}
port = _get_instance_port(port)
payload = {"struct": struct_name}
if field_name:
payload["fieldName"] = field_name
if field_offset is not None:
payload["fieldOffset"] = field_offset
if new_name:
payload["newName"] = new_name
if new_type:
payload["newType"] = new_type
if new_comment is not None:
payload["newComment"] = new_comment
response = safe_post(port, "structs/updatefield", payload)
return simplify_response(response)
@mcp.tool()
def structs_delete(name: str, port: int = None) -> dict:
"""Delete a struct data type
Args:
name: Name of the struct to delete
port: Specific Ghidra instance port (optional)
Returns:
dict: Operation result confirming deletion
"""
if not name:
return {
"success": False,
"error": {
"code": "MISSING_PARAMETER",
"message": "Struct name parameter is required"
},
"timestamp": int(time.time() * 1000)
}
port = _get_instance_port(port)
payload = {"name": name}
response = safe_post(port, "structs/delete", payload)
return simplify_response(response)
# Analysis tools
@mcp.tool()
def analysis_run(port: int = None, analysis_options: dict = None) -> dict:
"""Run analysis on the current program
Args:
analysis_options: Dictionary of analysis options to enable/disable
(e.g. {"functionRecovery": True, "dataRefs": False})
port: Specific Ghidra instance port (optional)
Returns:
dict: Analysis operation result with status
"""
port = _get_instance_port(port)
response = safe_post(port, "analysis", analysis_options or {})
return simplify_response(response)
@mcp.tool()
def analysis_get_callgraph(
name: str = None,
address: str = None,
max_depth: int = 3,
port: int = None,
# Pagination parameters
page_size: int = DEFAULT_PAGE_SIZE,
grep: str = None,
grep_ignorecase: bool = True,
return_all: bool = False,
ctx: Context = None
) -> dict:
"""Get function call graph with cursor-based pagination on edges
Args:
name: Starting function name (mutually exclusive with address)
address: Starting function address (mutually exclusive with name)
max_depth: Maximum call depth to analyze (default: 3)
port: Specific Ghidra instance port (optional)
page_size: Edges per page (default: 50, max: 500)
grep: Regex pattern to filter edges (e.g., "malloc|free", "FUN_00")
grep_ignorecase: Case-insensitive grep (default: True)
return_all: Return all edges without pagination
ctx: FastMCP context (auto-injected)
Returns:
dict: Call graph with paginated edges. Use cursor_next(cursor_id) for more.
Examples:
# Get callgraph, filter for memory functions
analysis_get_callgraph(name="main", grep="alloc|free|memcpy")
# Deep analysis with pagination
analysis_get_callgraph(name="main", max_depth=10, page_size=100)
"""
port_to_use = _get_instance_port(port)
params = {"max_depth": max_depth}
# Explicitly pass either name or address parameter based on what was provided
if address:
params["address"] = address
func_id = address
elif name:
params["name"] = name
func_id = name
else:
func_id = "entry_point"
# If neither is provided, the Java endpoint will use the entry point
response = safe_get(port_to_use, "analysis/callgraph", params)
simplified = simplify_response(response)
if not simplified.get("success", False):
return simplified
# Extract graph data - typically has nodes and edges
result = simplified.get("result", {})
edges = result.get("edges", []) if isinstance(result, dict) else []
nodes = result.get("nodes", []) if isinstance(result, dict) else []
if not edges:
return simplified # Return as-is if no edges
# Build query params for cursor hashing
query_params = {
"tool": "analysis_get_callgraph",
"port": port_to_use,
"name": name,
"address": address,
"max_depth": max_depth,
"grep": grep
}
sid = _get_session_id(ctx)
# Paginate edges (nodes are typically smaller, include all)
paginated = paginate_response(
data=edges,
query_params=query_params,
tool_name="analysis_get_callgraph",
session_id=sid,
page_size=min(page_size, MAX_PAGE_SIZE),
grep=grep,
grep_ignorecase=grep_ignorecase,
return_all=return_all
)
# Reconstruct result with paginated edges
if paginated.get("success"):
paginated["result"] = {
"root_function": func_id,
"max_depth": max_depth,
"nodes": nodes, # Include all nodes for context
"edges": paginated.get("result", []),
"total_nodes": len(nodes),
}
return paginated
@mcp.tool()
def analysis_get_dataflow(
address: str,
direction: str = "forward",
max_steps: int = 50,
port: int = None,
# Pagination parameters
page_size: int = DEFAULT_PAGE_SIZE,
grep: str = None,
grep_ignorecase: bool = True,
return_all: bool = False,
ctx: Context = None
) -> dict:
"""Perform data flow analysis with cursor-based pagination on steps
Args:
address: Starting address in hex format
direction: "forward" or "backward" (default: "forward")
max_steps: Maximum analysis steps (default: 50)
port: Specific Ghidra instance port (optional)
page_size: Steps per page (default: 50, max: 500)
grep: Regex pattern to filter steps (e.g., "MOV|LEA", "EAX|RAX")
grep_ignorecase: Case-insensitive grep (default: True)
return_all: Return all steps without pagination
ctx: FastMCP context (auto-injected)
Returns:
dict: Data flow steps with pagination. Use cursor_next(cursor_id) for more.
Examples:
# Track data flow, filter for memory operations
analysis_get_dataflow(address="0x401000", grep="MOV|PUSH|POP")
# Backward flow to find data sources
analysis_get_dataflow(address="0x401000", direction="backward", grep="LEA|MOV")
"""
if not address:
return {
"success": False,
"error": {
"code": "MISSING_PARAMETER",
"message": "Address parameter is required"
},
"timestamp": int(time.time() * 1000)
}
port_to_use = _get_instance_port(port)
params = {
"address": address,
"direction": direction,
"max_steps": max_steps
}
response = safe_get(port_to_use, "analysis/dataflow", params)
simplified = simplify_response(response)
if not simplified.get("success", False):
return simplified
# Extract dataflow steps
result = simplified.get("result", {})
steps = result.get("steps", []) if isinstance(result, dict) else []
if not steps:
return simplified # Return as-is if no steps
# Build query params for cursor hashing
query_params = {
"tool": "analysis_get_dataflow",
"port": port_to_use,
"address": address,
"direction": direction,
"max_steps": max_steps,
"grep": grep
}
sid = _get_session_id(ctx)
# Paginate steps
paginated = paginate_response(
data=steps,
query_params=query_params,
tool_name="analysis_get_dataflow",
session_id=sid,
page_size=min(page_size, MAX_PAGE_SIZE),
grep=grep,
grep_ignorecase=grep_ignorecase,
return_all=return_all
)
# Reconstruct result with paginated steps
if paginated.get("success"):
paginated["result"] = {
"start_address": address,
"direction": direction,
"steps": paginated.get("result", []),
}
# Preserve other metadata
if isinstance(result, dict):
for key in ["sources", "sinks", "total_steps"]:
if key in result:
paginated["result"][key] = result[key]
return paginated
@mcp.tool()
def ui_get_current_address(port: int = None) -> dict:
"""Get the address currently selected in Ghidra's UI
Args:
port: Specific Ghidra instance port (optional)
Returns:
Dict containing address information or error
"""
port = _get_instance_port(port)
response = safe_get(port, "address")
return simplify_response(response)
@mcp.tool()
def ui_get_current_function(port: int = None) -> dict:
"""Get the function currently selected in Ghidra's UI
Args:
port: Specific Ghidra instance port (optional)
Returns:
Dict containing function information or error
"""
port = _get_instance_port(port)
response = safe_get(port, "function")
return simplify_response(response)
@mcp.tool()
def comments_set(address: str, comment: str = "", comment_type: str = "plate", port: int = None) -> dict:
"""Set a comment at the specified address
Args:
address: Memory address in hex format
comment: Comment text (empty string removes comment)
comment_type: Type of comment - "plate", "pre", "post", "eol", "repeatable" (default: "plate")
port: Specific Ghidra instance port (optional)
Returns:
dict: Operation result
"""
if not address:
return {
"success": False,
"error": {
"code": "MISSING_PARAMETER",
"message": "Address parameter is required"
},
"timestamp": int(time.time() * 1000)
}
port = _get_instance_port(port)
payload = {
"comment": comment
}
response = safe_post(port, f"memory/{address}/comments/{comment_type}", payload)
return simplify_response(response)
@mcp.tool()
def functions_set_comment(address: str, comment: str = "", port: int = None) -> dict:
"""Set a decompiler-friendly comment (tries function comment, falls back to pre-comment)
Args:
address: Memory address in hex format (preferably function entry point)
comment: Comment text (empty string removes comment)
port: Specific Ghidra instance port (optional)
Returns:
dict: Operation result
"""
if not address:
return {
"success": False,
"error": {
"code": "MISSING_PARAMETER",
"message": "Address parameter is required"
},
"timestamp": int(time.time() * 1000)
}
port_to_use = _get_instance_port(port)
# Try setting as a function comment first using PATCH
try:
func_patch_payload = {
"comment": comment
}
patch_response = safe_patch(port_to_use, f"functions/{address}", func_patch_payload)
if patch_response.get("success", False):
return simplify_response(patch_response) # Success setting function comment
else:
print(f"Note: Failed to set function comment via PATCH on {address}, falling back. Error: {patch_response.get('error')}", file=sys.stderr)
except Exception as e:
print(f"Exception trying function comment PATCH: {e}. Falling back.", file=sys.stderr)
# Fall through to set pre-comment if PATCH fails
# Fallback: Set as a "pre" comment using the comments_set tool
print(f"Falling back to setting 'pre' comment for address {address}", file=sys.stderr)
return comments_set(address=address, comment=comment, comment_type="pre", port=port_to_use)
# ================= Startup =================
def main():
register_instance(DEFAULT_GHIDRA_PORT,
f"http://{ghidra_host}:{DEFAULT_GHIDRA_PORT}")
# Use quick discovery on startup
_discover_instances(QUICK_DISCOVERY_RANGE)
# Start background discovery thread
discovery_thread = threading.Thread(
target=periodic_discovery,
daemon=True,
name="MCGhidra-Discovery"
)
discovery_thread.start()
signal.signal(signal.SIGINT, handle_sigint)
mcp.run(transport="stdio")
if __name__ == "__main__":
main()