"""Structs mixin for GhydraMCP. Provides tools for struct data type operations. """ from typing import Any, Dict, List, Optional from fastmcp import Context from fastmcp.contrib.mcp_mixin import mcp_tool, mcp_resource from .base import GhydraMixinBase from ..config import get_config class StructsMixin(GhydraMixinBase): """Mixin for struct operations. Provides tools for: - Listing and searching structs - Getting struct details with field pagination - Creating and modifying structs - Managing struct fields """ @mcp_tool() def structs_list( self, category: Optional[str] = None, port: Optional[int] = None, page_size: int = 50, grep: Optional[str] = None, grep_ignorecase: bool = True, return_all: bool = False, fields: Optional[List[str]] = None, ctx: Optional[Context] = None, ) -> Dict[str, Any]: """List all struct data types with cursor-based pagination. Args: category: Filter by category path (e.g. "/winapi") port: Ghidra instance port (optional) page_size: Items per page (default: 50, max: 500) grep: Regex pattern to filter struct names grep_ignorecase: Case-insensitive grep (default: True) return_all: Return all results without pagination fields: Field names to keep (e.g. ['name', 'size']). Reduces response size. ctx: FastMCP context (auto-injected) Returns: Structs with pagination metadata """ try: port = self.get_instance_port(port) except ValueError as e: return {"success": False, "error": {"code": "NO_INSTANCE", "message": str(e)}} config = get_config() params = {"offset": 0, "limit": 10000} if category: params["category"] = category response = self.safe_get(port, "structs", params) simplified = self.simplify_response(response) if not simplified.get("success", True): return simplified all_structs = simplified.get("result", []) if not isinstance(all_structs, list): all_structs = [] query_params = { "tool": "structs_list", "port": port, "category": category, "grep": grep, } session_id = self._get_session_id(ctx) return self.filtered_paginate( data=all_structs, query_params=query_params, tool_name="structs_list", session_id=session_id, page_size=min(page_size, config.max_page_size), grep=grep, grep_ignorecase=grep_ignorecase, return_all=return_all, fields=fields, ) @mcp_tool() def structs_get( self, name: str, port: Optional[int] = None, page_size: int = 50, grep: Optional[str] = None, grep_ignorecase: bool = True, return_all: bool = False, project_fields: Optional[List[str]] = None, ctx: Optional[Context] = None, ) -> Dict[str, Any]: """Get detailed information about a struct with field pagination. Args: name: Struct name port: Ghidra instance port (optional) page_size: Fields per page (default: 50, max: 500) grep: Regex pattern to filter fields grep_ignorecase: Case-insensitive grep (default: True) return_all: Return all fields without pagination project_fields: Field names to keep per struct field item. Reduces response size. ctx: FastMCP context (auto-injected) Returns: Struct details with paginated fields """ if not name: return { "success": False, "error": { "code": "MISSING_PARAMETER", "message": "Struct name parameter is required", }, } try: port = self.get_instance_port(port) except ValueError as e: return {"success": False, "error": {"code": "NO_INSTANCE", "message": str(e)}} config = get_config() session_id = self._get_session_id(ctx) params = {"name": name} response = self.safe_get(port, "structs", params) simplified = self.simplify_response(response) if not simplified.get("success", True): return simplified result = simplified.get("result", simplified) # Extract struct info and fields struct_info = {} fields = [] if isinstance(result, dict): for key, value in result.items(): if key == "fields" and isinstance(value, list): fields = value else: struct_info[key] = value # If few fields and no grep, return as-is if len(fields) <= 10 and not grep: return simplified query_params = { "tool": "structs_get", "port": port, "name": name, } # Paginate fields paginated = self.filtered_paginate( data=fields, query_params=query_params, tool_name="structs_get", session_id=session_id, page_size=min(page_size, config.max_page_size), grep=grep, grep_ignorecase=grep_ignorecase, return_all=return_all, fields=project_fields, ) # Merge struct metadata with paginated fields (skip if guarded) if paginated.get("success") and not paginated.get("guarded"): paginated["struct_name"] = struct_info.get("name", name) paginated["struct_size"] = struct_info.get("size", struct_info.get("length")) paginated["struct_category"] = struct_info.get("category", struct_info.get("categoryPath")) paginated["struct_description"] = struct_info.get("description") paginated["fields"] = paginated.pop("result", []) if "_message" in paginated: paginated["_message"] = paginated["_message"].replace("items", "fields") return paginated @mcp_tool() def structs_create( self, name: str, category: Optional[str] = None, description: Optional[str] = None, port: Optional[int] = None, ) -> Dict[str, Any]: """Create a new struct data type. Args: name: Name for the new struct category: Category path (e.g. "/custom") description: Optional description port: Ghidra instance port (optional) Returns: Created struct information """ if not name: return { "success": False, "error": { "code": "MISSING_PARAMETER", "message": "Struct name parameter is required", }, } try: port = self.get_instance_port(port) except ValueError as e: return {"success": False, "error": {"code": "NO_INSTANCE", "message": str(e)}} payload = {"name": name} if category: payload["category"] = category if description: payload["description"] = description response = self.safe_post(port, "structs/create", payload) return self.simplify_response(response) @mcp_tool() def structs_add_field( self, struct_name: str, field_name: str, field_type: str, offset: Optional[int] = None, comment: Optional[str] = None, port: Optional[int] = None, ) -> Dict[str, Any]: """Add a field to an existing struct. Args: struct_name: Name of the struct to modify field_name: Name for the new field field_type: Data type for the field offset: Specific offset (appends if not specified) comment: Optional field comment port: Ghidra instance port (optional) Returns: Operation result """ if not struct_name or not field_name or not field_type: return { "success": False, "error": { "code": "MISSING_PARAMETER", "message": "struct_name, field_name, and field_type are required", }, } try: port = self.get_instance_port(port) except ValueError as e: return {"success": False, "error": {"code": "NO_INSTANCE", "message": str(e)}} payload = { "struct": struct_name, "fieldName": field_name, "fieldType": field_type, } if offset is not None: payload["offset"] = offset if comment: payload["comment"] = comment response = self.safe_post(port, "structs/addfield", payload) return self.simplify_response(response) @mcp_tool() def structs_update_field( self, struct_name: str, field_name: Optional[str] = None, field_offset: Optional[int] = None, new_name: Optional[str] = None, new_type: Optional[str] = None, new_comment: Optional[str] = None, port: Optional[int] = None, ) -> Dict[str, Any]: """Update an existing field in a struct. Args: struct_name: Name of the struct to modify field_name: Name of the field to update (OR field_offset) field_offset: Offset of the field to update (OR field_name) new_name: New name for the field new_type: New data type for the field new_comment: New comment for the field port: Ghidra instance port (optional) Returns: Operation result """ if not struct_name: return { "success": False, "error": { "code": "MISSING_PARAMETER", "message": "struct_name parameter is required", }, } if not field_name and field_offset is None: return { "success": False, "error": { "code": "MISSING_PARAMETER", "message": "Either field_name or field_offset must be provided", }, } if not new_name and not new_type and new_comment is None: return { "success": False, "error": { "code": "MISSING_PARAMETER", "message": "At least one of new_name, new_type, or new_comment required", }, } try: port = self.get_instance_port(port) except ValueError as e: return {"success": False, "error": {"code": "NO_INSTANCE", "message": str(e)}} payload = {"struct": struct_name} if field_name: payload["fieldName"] = field_name if field_offset is not None: payload["fieldOffset"] = field_offset if new_name: payload["newName"] = new_name if new_type: payload["newType"] = new_type if new_comment is not None: payload["newComment"] = new_comment response = self.safe_post(port, "structs/updatefield", payload) return self.simplify_response(response) @mcp_tool() def structs_delete( self, name: str, port: Optional[int] = None, ) -> Dict[str, Any]: """Delete a struct data type. Args: name: Name of the struct to delete port: Ghidra instance port (optional) Returns: Operation result """ if not name: return { "success": False, "error": { "code": "MISSING_PARAMETER", "message": "Struct name parameter is required", }, } try: port = self.get_instance_port(port) except ValueError as e: return {"success": False, "error": {"code": "NO_INSTANCE", "message": str(e)}} payload = {"name": name} response = self.safe_post(port, "structs/delete", payload) return self.simplify_response(response) # Resources @mcp_resource(uri="ghidra://instance/{port}/structs") def resource_structs_list(self, port: Optional[int] = None) -> Dict[str, Any]: """MCP Resource: List structs (capped). Args: port: Ghidra instance port Returns: List of structs (capped at 1000) """ try: port = self.get_instance_port(port) except ValueError as e: return {"error": str(e)} config = get_config() cap = config.resource_caps.get("structs", 1000) response = self.safe_get(port, "structs", {"limit": cap}) simplified = self.simplify_response(response) if not simplified.get("success", True): return simplified structs = simplified.get("result", []) if not isinstance(structs, list): structs = [] return { "structs": structs[:cap], "count": len(structs), "capped_at": cap if len(structs) >= cap else None, "_hint": "Use structs_list() tool for full pagination" if len(structs) >= cap else None, }