Add language, base address, and loader support for raw firmware import
Some checks are pending
Build Ghidra Plugin / build (push) Waiting to run

Wire GHIDRA_LANGUAGE, GHIDRA_BASE_ADDRESS, GHIDRA_LOADER through the
Docker entrypoint and MCP tools so raw binaries (e.g., ARM7TDMI firmware)
get the correct processor, memory map, and loader instead of relying on
auto-detection. Auto-sets BinaryLoader when language is specified.

Input validation at both Python and bash layers prevents malformed values
from reaching analyzeHeadless.
This commit is contained in:
Ryan Malloy 2026-03-06 21:46:22 -07:00
parent 83949683ae
commit 0250c2df01
4 changed files with 124 additions and 11 deletions

View File

@ -48,6 +48,9 @@ case "${MCGHIDRA_MODE}" in
echo " MCGHIDRA_MAXMEM - Max JVM heap (default: 2G)" echo " MCGHIDRA_MAXMEM - Max JVM heap (default: 2G)"
echo " PROJECT_NAME - Ghidra project name (default: MCGhidra)" echo " PROJECT_NAME - Ghidra project name (default: MCGhidra)"
echo " PROJECT_DIR - Project directory (default: /projects)" echo " PROJECT_DIR - Project directory (default: /projects)"
echo " GHIDRA_LANGUAGE - Processor language ID (e.g., ARM:LE:32:v4t)"
echo " GHIDRA_BASE_ADDRESS - Base address for raw binaries (e.g., 0x00000000)"
echo " GHIDRA_LOADER - Loader type (e.g., BinaryLoader for raw firmware)"
echo "" echo ""
echo "Starting in wait mode..." echo "Starting in wait mode..."
echo "Container will stay running for debugging or manual operation." echo "Container will stay running for debugging or manual operation."
@ -81,6 +84,36 @@ case "${MCGHIDRA_MODE}" in
-postScript "MCGhidraServer.py" "${MCGHIDRA_PORT}" -postScript "MCGhidraServer.py" "${MCGHIDRA_PORT}"
) )
# Optional: processor/language for raw binaries
if [ -n "${GHIDRA_LANGUAGE}" ]; then
if ! echo "${GHIDRA_LANGUAGE}" | grep -qE '^[A-Za-z0-9_]+:[A-Z]{2}:[0-9]+:[A-Za-z0-9._-]+$'; then
echo "ERROR: Invalid GHIDRA_LANGUAGE format: ${GHIDRA_LANGUAGE}"
echo "Expected: ARCH:ENDIAN:SIZE:VARIANT (e.g., ARM:LE:32:v4t)"
exit 1
fi
ANALYZE_ARGS+=(-processor "${GHIDRA_LANGUAGE}")
fi
# Optional: base address
if [ -n "${GHIDRA_BASE_ADDRESS}" ]; then
if ! echo "${GHIDRA_BASE_ADDRESS}" | grep -qE '^(0x)?[0-9a-fA-F]+$'; then
echo "ERROR: Invalid GHIDRA_BASE_ADDRESS format: ${GHIDRA_BASE_ADDRESS}"
echo "Expected hex: 0x00000000 or 00000000"
exit 1
fi
ANALYZE_ARGS+=(-loader-baseAddr "${GHIDRA_BASE_ADDRESS}")
fi
# Optional: explicit loader (e.g., BinaryLoader for raw firmware)
if [ -n "${GHIDRA_LOADER}" ]; then
if ! echo "${GHIDRA_LOADER}" | grep -qE '^[A-Za-z0-9_]+$'; then
echo "ERROR: Invalid GHIDRA_LOADER format: ${GHIDRA_LOADER}"
echo "Expected alphanumeric name (e.g., BinaryLoader)"
exit 1
fi
ANALYZE_ARGS+=(-loader "${GHIDRA_LOADER}")
fi
# Add any extra arguments passed # Add any extra arguments passed
ANALYZE_ARGS+=("$@") ANALYZE_ARGS+=("$@")

View File

@ -1,6 +1,6 @@
[project] [project]
name = "mcghidra" name = "mcghidra"
version = "2026.3.6" version = "2026.3.6.1"
description = "Reverse engineering bridge: multi-instance Ghidra plugin with HATEOAS REST API and MCP server for decompilation, analysis & binary manipulation" description = "Reverse engineering bridge: multi-instance Ghidra plugin with HATEOAS REST API and MCP server for decompilation, analysis & binary manipulation"
readme = "README.md" readme = "README.md"
requires-python = ">=3.11" requires-python = ">=3.11"

View File

@ -11,6 +11,7 @@ import asyncio
import fcntl import fcntl
import json import json
import os import os
import re
import shutil import shutil
import subprocess import subprocess
import time import time
@ -378,6 +379,28 @@ class DockerMixin(MCGhidraMixinBase):
f"{self.LABEL_PREFIX}.pid": str(os.getpid()), f"{self.LABEL_PREFIX}.pid": str(os.getpid()),
} }
@staticmethod
def _validate_ghidra_language(language: str) -> bool:
"""Validate Ghidra language ID format (e.g., ARM:LE:32:v4t)."""
return bool(re.match(r'^[A-Za-z0-9_]+:[A-Z]{2}:[0-9]+:[A-Za-z0-9._-]+$', language))
@staticmethod
def _validate_hex_address(address: str) -> bool:
"""Validate hex address format (e.g., 0x00000000 or 00000000)."""
if not re.match(r'^(0x)?[0-9a-fA-F]+$', address):
return False
addr_str = address[2:] if address.startswith("0x") else address
try:
val = int(addr_str, 16)
return 0 <= val <= 0xFFFFFFFFFFFFFFFF
except ValueError:
return False
@staticmethod
def _validate_loader_name(loader: str) -> bool:
"""Validate Ghidra loader name (alphanumeric + underscore)."""
return bool(re.match(r'^[A-Za-z0-9_]+$', loader))
async def _find_containers_by_label( async def _find_containers_by_label(
self, self,
label_filter: Optional[str] = None, label_filter: Optional[str] = None,
@ -553,6 +576,9 @@ class DockerMixin(MCGhidraMixinBase):
binary_path: str, binary_path: str,
memory: str = "2G", memory: str = "2G",
name: Optional[str] = None, name: Optional[str] = None,
language: Optional[str] = None,
base_address: Optional[str] = None,
loader: Optional[str] = None,
ctx: Optional[Context] = None, ctx: Optional[Context] = None,
) -> Dict[str, Any]: ) -> Dict[str, Any]:
"""Start a MCGhidra Docker container for binary analysis. """Start a MCGhidra Docker container for binary analysis.
@ -569,6 +595,9 @@ class DockerMixin(MCGhidraMixinBase):
binary_path: Path to the binary file to analyze binary_path: Path to the binary file to analyze
memory: Max JVM heap memory (default: 2G) memory: Max JVM heap memory (default: 2G)
name: Container name (auto-generated if not specified) name: Container name (auto-generated if not specified)
language: Ghidra processor language ID for raw binaries (e.g., "ARM:LE:32:v4t")
base_address: Base address for raw binaries (e.g., "0x00000000")
loader: Ghidra loader type (e.g., "BinaryLoader"). Auto-set when language is specified.
Returns: Returns:
Container info including ID, name, port, and API URL Container info including ID, name, port, and API URL
@ -641,6 +670,38 @@ class DockerMixin(MCGhidraMixinBase):
for k, v in labels.items(): for k, v in labels.items():
label_args.extend(["-l", f"{k}={v}"]) label_args.extend(["-l", f"{k}={v}"])
# Validate firmware import parameters
if language and not self._validate_ghidra_language(language):
self.port_pool.release(port)
return {
"error": f"Invalid language format: {language}",
"hint": "Expected ARCH:ENDIAN:SIZE:VARIANT (e.g., ARM:LE:32:v4t)",
}
if base_address and not self._validate_hex_address(base_address):
self.port_pool.release(port)
return {
"error": f"Invalid base address: {base_address}",
"hint": "Expected hex format: 0x00000000 or 00000000",
}
if loader and not self._validate_loader_name(loader):
self.port_pool.release(port)
return {
"error": f"Invalid loader name: {loader}",
"hint": "Expected alphanumeric name (e.g., BinaryLoader)",
}
# Build environment variable arguments
env_args = ["-e", f"MCGHIDRA_MAXMEM={memory}"]
if language:
env_args.extend(["-e", f"GHIDRA_LANGUAGE={language}"])
# Auto-set BinaryLoader when language is explicitly specified
if not loader:
loader = "BinaryLoader"
if base_address:
env_args.extend(["-e", f"GHIDRA_BASE_ADDRESS={base_address}"])
if loader:
env_args.extend(["-e", f"GHIDRA_LOADER={loader}"])
# Start the container # Start the container
run_result = await self._run_docker_cmd( run_result = await self._run_docker_cmd(
[ [
@ -652,8 +713,7 @@ class DockerMixin(MCGhidraMixinBase):
f"{port}:8192", f"{port}:8192",
"-v", "-v",
f"{binary_file.parent}:/binaries:ro", f"{binary_file.parent}:/binaries:ro",
"-e", *env_args,
f"MCGHIDRA_MAXMEM={memory}",
*label_args, *label_args,
"mcghidra:latest", "mcghidra:latest",
f"/binaries/{binary_file.name}", f"/binaries/{binary_file.name}",
@ -663,14 +723,19 @@ class DockerMixin(MCGhidraMixinBase):
container_id = run_result.stdout.strip() container_id = run_result.stdout.strip()
# Track the container in this session # Track the container in this session
self._session_containers[container_id] = { session_info: Dict[str, Any] = {
"name": name, "name": name,
"port": port, "port": port,
"binary": str(binary_file), "binary": str(binary_file),
"memory": memory, "memory": memory,
} }
if language:
session_info["language"] = language
if base_address:
session_info["base_address"] = base_address
self._session_containers[container_id] = session_info
return { result_info: Dict[str, Any] = {
"success": True, "success": True,
"session_id": self.session_id, "session_id": self.session_id,
"container_id": container_id[:12], "container_id": container_id[:12],
@ -684,6 +749,11 @@ class DockerMixin(MCGhidraMixinBase):
f"Use docker_logs('{name}') to monitor progress." f"Use docker_logs('{name}') to monitor progress."
), ),
} }
if language:
result_info["language"] = language
if base_address:
result_info["base_address"] = base_address
return result_info
except subprocess.CalledProcessError as e: except subprocess.CalledProcessError as e:
if port is not None: if port is not None:
@ -968,6 +1038,9 @@ class DockerMixin(MCGhidraMixinBase):
async def docker_auto_start( async def docker_auto_start(
self, self,
binary_path: str, binary_path: str,
language: Optional[str] = None,
base_address: Optional[str] = None,
loader: Optional[str] = None,
ctx: Optional[Context] = None, ctx: Optional[Context] = None,
) -> Dict[str, Any]: ) -> Dict[str, Any]:
"""Automatically start a Docker container with intelligent port allocation. """Automatically start a Docker container with intelligent port allocation.
@ -985,6 +1058,9 @@ class DockerMixin(MCGhidraMixinBase):
Args: Args:
binary_path: Path to the binary to analyze binary_path: Path to the binary to analyze
language: Ghidra processor language ID for raw binaries (e.g., "ARM:LE:32:v4t")
base_address: Base address for raw binaries (e.g., "0x00000000")
loader: Ghidra loader type (e.g., "BinaryLoader"). Auto-set when language is specified.
Returns: Returns:
Instance connection info with session ID and port details. Instance connection info with session ID and port details.
@ -1031,7 +1107,11 @@ class DockerMixin(MCGhidraMixinBase):
# Start a new container (port auto-allocated from pool) # Start a new container (port auto-allocated from pool)
start_result = await self.docker_start( start_result = await self.docker_start(
binary_path=binary_path, ctx=ctx binary_path=binary_path,
language=language,
base_address=base_address,
loader=loader,
ctx=ctx,
) )
if not start_result.get("success"): if not start_result.get("success"):

2
uv.lock generated
View File

@ -572,7 +572,7 @@ wheels = [
[[package]] [[package]]
name = "mcghidra" name = "mcghidra"
version = "2026.2.11" version = "2026.3.6.1"
source = { editable = "." } source = { editable = "." }
dependencies = [ dependencies = [
{ name = "fastmcp" }, { name = "fastmcp" },