Add ESP-IDF integration component with two-tier loading
Fix idf.py path bug (tools/idf.py in ESP-IDF v5.x) that prevented auto-detection. Add get_idf_path_found() for toolchain management without requiring a fully configured environment. Tier 1 (idf_path_found): idf_tools_list, idf_tools_check, idf_tools_install, idf_env_info — always available when IDF directory exists with idf_tools.py. Tier 2 (idf_available): idf_build_project, idf_flash_project, idf_monitor — require full IDF env, gate checked at invocation time so they unlock after install without restart. Includes input validation (target names, tool names, baud rates), centralized subprocess cleanup via _run_idf_py, mtime-based cache invalidation for tools.json, and esp://idf/status resource with per-target readiness map.
This commit is contained in:
parent
34bc2e2d86
commit
ef19d19da2
@ -88,6 +88,10 @@ export default defineConfig({
|
||||
label: "Product Catalog",
|
||||
slug: "reference/product-catalog",
|
||||
},
|
||||
{
|
||||
label: "IDF Integration",
|
||||
slug: "reference/idf-integration",
|
||||
},
|
||||
],
|
||||
},
|
||||
{ label: "Resources", slug: "reference/resources" },
|
||||
|
||||
361
docs-site/src/content/docs/reference/idf-integration.mdx
Normal file
361
docs-site/src/content/docs/reference/idf-integration.mdx
Normal file
@ -0,0 +1,361 @@
|
||||
---
|
||||
title: IDF Integration
|
||||
description: ESP-IDF toolchain management and project workflow tools
|
||||
---
|
||||
|
||||
import { Aside, Tabs, TabItem } from '@astrojs/starlight/components';
|
||||
|
||||
The IDF Integration component provides toolchain management and project workflow tools that wrap ESP-IDF's `idf_tools.py` and `idf.py`. It uses a **two-tier loading model**:
|
||||
|
||||
- **Tier 1 (Toolchain Management)**: Available whenever ESP-IDF is found with `idf_tools.py`. List, check, and install toolchains even when the environment is partially set up.
|
||||
- **Tier 2 (Project Workflows)**: Available when the full IDF environment is configured. Build, flash, and monitor projects.
|
||||
|
||||
<Aside type="tip">
|
||||
If Tier 2 tools report the environment is not available, use `idf_tools_check` to identify missing tools and `idf_tools_install` to set them up.
|
||||
</Aside>
|
||||
|
||||
### Target Architecture Mapping
|
||||
|
||||
| Target | Architecture | Notes |
|
||||
|--------|-------------|-------|
|
||||
| `esp32` | Xtensa | Original ESP32 |
|
||||
| `esp32s2` | Xtensa | Single-core, USB-OTG |
|
||||
| `esp32s3` | Xtensa | Dual-core, USB-OTG, AI acceleration |
|
||||
| `esp32c2` | RISC-V | Low-cost, Wi-Fi 4, BLE 5 |
|
||||
| `esp32c3` | RISC-V | Wi-Fi 4, BLE 5, single-core |
|
||||
| `esp32c5` | RISC-V | Wi-Fi 5 (dual-band), BLE 5 |
|
||||
| `esp32c6` | RISC-V | Wi-Fi 6, BLE 5, 802.15.4 |
|
||||
| `esp32c61` | RISC-V | Cost-optimized C6 variant |
|
||||
| `esp32h2` | RISC-V | 802.15.4 (Thread/Zigbee), BLE 5 |
|
||||
| `esp32p4` | RISC-V | High-performance, dual-core 400MHz |
|
||||
|
||||
---
|
||||
|
||||
## Tier 1: Toolchain Management
|
||||
|
||||
These tools are available whenever ESP-IDF is found with `idf_tools.py` present.
|
||||
|
||||
### idf_tools_list
|
||||
|
||||
List all ESP-IDF tools with install status and target support. Reads both `idf_tools.py list` output and `tools.json` metadata.
|
||||
|
||||
#### Parameters
|
||||
|
||||
| Name | Type | Default | Description |
|
||||
|------|------|---------|-------------|
|
||||
| `target` | `str \| None` | `None` | Filter to tools supporting this target (e.g. `"esp32p4"`). Returns all tools if omitted. |
|
||||
|
||||
#### Example
|
||||
|
||||
```python
|
||||
# List all tools
|
||||
result = await client.call_tool("idf_tools_list", {})
|
||||
|
||||
# List tools required for ESP32-P4
|
||||
result = await client.call_tool("idf_tools_list", {
|
||||
"target": "esp32p4"
|
||||
})
|
||||
```
|
||||
|
||||
#### Return Value
|
||||
|
||||
```json
|
||||
{
|
||||
"success": true,
|
||||
"idf_path": "/home/user/esp/esp-idf",
|
||||
"tool_count": 15,
|
||||
"tools": [
|
||||
{
|
||||
"name": "xtensa-esp-elf",
|
||||
"description": "Toolchain for Xtensa-based ESP chips",
|
||||
"supported_targets": ["esp32", "esp32s2", "esp32s3"],
|
||||
"versions": [
|
||||
{
|
||||
"version": "14.2.0_20241119",
|
||||
"installed": true,
|
||||
"archives": [...]
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
||||
}
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
### idf_tools_check
|
||||
|
||||
Check which tools are installed vs missing. When a target is specified, cross-references `tools.json` to show which missing tools are needed for that chip.
|
||||
|
||||
#### Parameters
|
||||
|
||||
| Name | Type | Default | Description |
|
||||
|------|------|---------|-------------|
|
||||
| `target` | `str \| None` | `None` | ESP target chip (e.g. `"esp32"`, `"esp32p4"`). When set, highlights tools required for that target. |
|
||||
|
||||
#### Example
|
||||
|
||||
```python
|
||||
# Check all tools
|
||||
result = await client.call_tool("idf_tools_check", {})
|
||||
|
||||
# Check readiness for ESP32-P4 (RISC-V target)
|
||||
result = await client.call_tool("idf_tools_check", {
|
||||
"target": "esp32p4"
|
||||
})
|
||||
```
|
||||
|
||||
#### Return Value
|
||||
|
||||
```json
|
||||
{
|
||||
"success": true,
|
||||
"idf_path": "/home/user/esp/esp-idf",
|
||||
"installed": ["xtensa-esp-elf 14.2.0_20241119", "..."],
|
||||
"missing": ["riscv32-esp-elf 14.2.0_20241119"],
|
||||
"installed_count": 12,
|
||||
"missing_count": 1,
|
||||
"target": "esp32p4",
|
||||
"architecture": "riscv",
|
||||
"target_ready": false,
|
||||
"missing_for_target": ["riscv32-esp-elf 14.2.0_20241119"],
|
||||
"install_hint": "Run idf_tools_install with targets=['esp32p4'] to install 1 missing tool(s)"
|
||||
}
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
### idf_tools_install
|
||||
|
||||
Install ESP-IDF toolchains. Specify target chips (which resolves to required tools) or specific tool names.
|
||||
|
||||
#### Parameters
|
||||
|
||||
| Name | Type | Default | Description |
|
||||
|------|------|---------|-------------|
|
||||
| `targets` | `list[str] \| None` | `None` | List of ESP target chips (e.g. `["esp32p4", "esp32c3"]`). Installs all tools required for these targets. |
|
||||
| `tools` | `list[str] \| None` | `None` | List of specific tool names (e.g. `["riscv32-esp-elf"]`). Overrides targets if both are given. |
|
||||
|
||||
<Aside type="caution">
|
||||
Tool installation downloads can be large (100MB+ per toolchain). The operation has a 10-minute timeout.
|
||||
</Aside>
|
||||
|
||||
#### Example
|
||||
|
||||
```python
|
||||
# Install tools for RISC-V targets
|
||||
result = await client.call_tool("idf_tools_install", {
|
||||
"targets": ["esp32p4", "esp32c3"]
|
||||
})
|
||||
|
||||
# Install a specific tool by name
|
||||
result = await client.call_tool("idf_tools_install", {
|
||||
"tools": ["riscv32-esp-elf"]
|
||||
})
|
||||
```
|
||||
|
||||
#### Return Value
|
||||
|
||||
```json
|
||||
{
|
||||
"success": true,
|
||||
"command": "idf_tools.py install --targets=esp32p4,esp32c3",
|
||||
"output_summary": "... Installing riscv32-esp-elf ... Done",
|
||||
"post_install_missing": [],
|
||||
"post_install_installed_count": 15
|
||||
}
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
### idf_env_info
|
||||
|
||||
Get current ESP-IDF environment information including version, exported environment variables, and PATH additions.
|
||||
|
||||
#### Parameters
|
||||
|
||||
None.
|
||||
|
||||
#### Example
|
||||
|
||||
```python
|
||||
result = await client.call_tool("idf_env_info", {})
|
||||
```
|
||||
|
||||
#### Return Value
|
||||
|
||||
```json
|
||||
{
|
||||
"success": true,
|
||||
"idf_path": "/home/user/esp/esp-idf",
|
||||
"idf_version": "v5.4",
|
||||
"exported_vars": {
|
||||
"PATH": "/home/user/.espressif/tools/xtensa-esp-elf/14.2.0/.../bin:...",
|
||||
"IDF_PYTHON_ENV_PATH": "/home/user/.espressif/python_env/idf5.4_py3.12_env"
|
||||
},
|
||||
"path_additions": [
|
||||
"/home/user/.espressif/tools/xtensa-esp-elf/14.2.0/.../bin",
|
||||
"/home/user/.espressif/tools/riscv32-esp-elf/14.2.0/.../bin"
|
||||
]
|
||||
}
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Tier 2: Project Workflows
|
||||
|
||||
These tools require a fully configured ESP-IDF environment. If the environment is incomplete, they return a helpful error pointing to `idf_tools_check` and `idf_tools_install`.
|
||||
|
||||
### idf_build_project
|
||||
|
||||
Build an ESP-IDF project. Sets the target chip and runs `idf.py build`.
|
||||
|
||||
#### Parameters
|
||||
|
||||
| Name | Type | Default | Description |
|
||||
|------|------|---------|-------------|
|
||||
| `project_path` | `str` | *(required)* | Path to ESP-IDF project directory (must contain `CMakeLists.txt`). |
|
||||
| `target` | `str` | `"esp32"` | Target chip -- must be a known target from the architecture table above. |
|
||||
| `clean` | `bool` | `false` | Run `idf.py fullclean` before building. |
|
||||
|
||||
#### Example
|
||||
|
||||
```python
|
||||
result = await client.call_tool("idf_build_project", {
|
||||
"project_path": "~/esp/my-project",
|
||||
"target": "esp32s3"
|
||||
})
|
||||
|
||||
# Clean build
|
||||
result = await client.call_tool("idf_build_project", {
|
||||
"project_path": "~/esp/my-project",
|
||||
"target": "esp32s3",
|
||||
"clean": True
|
||||
})
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
### idf_flash_project
|
||||
|
||||
Flash a built ESP-IDF project to a device.
|
||||
|
||||
#### Parameters
|
||||
|
||||
| Name | Type | Default | Description |
|
||||
|------|------|---------|-------------|
|
||||
| `project_path` | `str` | *(required)* | Path to ESP-IDF project directory (must be built). |
|
||||
| `port` | `str` | *(required)* | Serial port or socket URI for the target device. |
|
||||
| `baud` | `int` | `460800` | Flash baud rate. Must be a standard rate (9600, 57600, 115200, 230400, 460800, 921600, 1500000, 2000000). |
|
||||
|
||||
#### Example
|
||||
|
||||
```python
|
||||
result = await client.call_tool("idf_flash_project", {
|
||||
"project_path": "~/esp/my-project",
|
||||
"port": "/dev/ttyUSB0"
|
||||
})
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
### idf_monitor
|
||||
|
||||
Capture serial output from an ESP device using IDF monitor. Supports ELF-based address decoding when a project path is provided.
|
||||
|
||||
#### Parameters
|
||||
|
||||
| Name | Type | Default | Description |
|
||||
|------|------|---------|-------------|
|
||||
| `port` | `str` | *(required)* | Serial port or socket URI. |
|
||||
| `project_path` | `str \| None` | `None` | Optional ESP-IDF project path for ELF decoding of crash backtraces. |
|
||||
| `duration` | `int` | `10` | Capture duration in seconds (max 60). |
|
||||
|
||||
#### Example
|
||||
|
||||
```python
|
||||
result = await client.call_tool("idf_monitor", {
|
||||
"port": "/dev/ttyUSB0",
|
||||
"project_path": "~/esp/my-project",
|
||||
"duration": 15
|
||||
})
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Resource
|
||||
|
||||
### esp://idf/status
|
||||
|
||||
Real-time ESP-IDF installation status.
|
||||
|
||||
```json
|
||||
{
|
||||
"available": true,
|
||||
"idf_path": "/home/user/esp/esp-idf",
|
||||
"idf_version": "v5.4",
|
||||
"idf_path_found": true,
|
||||
"idf_fully_available": true,
|
||||
"installed_tools": 14,
|
||||
"missing_tools": 1,
|
||||
"missing_tool_names": ["riscv32-esp-elf 14.2.0_20241119"],
|
||||
"target_readiness": {
|
||||
"esp32": true,
|
||||
"esp32s2": true,
|
||||
"esp32s3": true,
|
||||
"esp32c3": false,
|
||||
"esp32c6": false,
|
||||
"esp32p4": false
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Prompt
|
||||
|
||||
### idf_setup_target
|
||||
|
||||
Generates a guided setup walkthrough for a specific ESP target chip. Shows which tools are required, which are installed/missing, and provides install commands.
|
||||
|
||||
```python
|
||||
prompt = await client.get_prompt("idf_setup_target", {"target": "esp32p4"})
|
||||
```
|
||||
|
||||
Example output:
|
||||
|
||||
```markdown
|
||||
# ESP-IDF Setup for esp32p4 (riscv architecture)
|
||||
|
||||
ESP-IDF path: `/home/user/esp/esp-idf`
|
||||
|
||||
## Tools required for esp32p4
|
||||
|
||||
- **riscv32-esp-elf** v14.2.0_20241119 [MISSING]
|
||||
- **esp-rom-elfs** v20241011 [installed]
|
||||
- **cmake** v3.24.0 [installed]
|
||||
|
||||
## Install missing tools
|
||||
|
||||
Use the `idf_tools_install` tool with `targets=["esp32p4"]`
|
||||
to install 1 missing tool(s).
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Typical Workflow
|
||||
|
||||
<Tabs>
|
||||
<TabItem label="Setup New Target">
|
||||
1. Check what's needed: `idf_tools_check` with `target="esp32p4"`
|
||||
2. Install missing tools: `idf_tools_install` with `targets=["esp32p4"]`
|
||||
3. Verify: `idf_tools_check` with `target="esp32p4"` -- should show `target_ready: true`
|
||||
4. Build: `idf_build_project` with the project path and target
|
||||
</TabItem>
|
||||
<TabItem label="Diagnose Environment">
|
||||
1. Get environment info: `idf_env_info`
|
||||
2. List all tools: `idf_tools_list`
|
||||
3. Check IDF status resource: read `esp://idf/status`
|
||||
4. Use the `idf_setup_target` prompt for guided troubleshooting
|
||||
</TabItem>
|
||||
</Tabs>
|
||||
@ -121,6 +121,29 @@ All tools follow a consistent pattern: they return a JSON object with a `success
|
||||
| `esp_product_recommend` | Recommendations based on use case description |
|
||||
| `esp_product_availability` | Filter by production status, lead time, and MOQ |
|
||||
|
||||
### IDF Tools (4 tools + 1 resource + 1 prompt)
|
||||
|
||||
Toolchain management tools -- available when ESP-IDF is found with `idf_tools.py`.
|
||||
|
||||
| Tool | Description |
|
||||
|------|-------------|
|
||||
| `idf_tools_list` | List all IDF tools with install status and target support |
|
||||
| `idf_tools_check` | Check installed vs missing tools, filter by target |
|
||||
| `idf_tools_install` | Install toolchains for target(s) or by tool name |
|
||||
| `idf_env_info` | Current IDF environment variables, version, PATH additions |
|
||||
| `esp://idf/status` *(resource)* | IDF version, installed/missing tools, per-target readiness |
|
||||
| `idf_setup_target` *(prompt)* | Guided setup showing what's needed for a target chip |
|
||||
|
||||
### ESP-IDF Project Workflows (3 tools)
|
||||
|
||||
Project workflow tools -- require a fully configured ESP-IDF environment.
|
||||
|
||||
| Tool | Description |
|
||||
|------|-------------|
|
||||
| `idf_build_project` | Build an ESP-IDF project with `idf.py build` |
|
||||
| `idf_flash_project` | Flash built firmware to a device with `idf.py flash` |
|
||||
| `idf_monitor` | Capture serial output with ELF-based address decoding |
|
||||
|
||||
## Additional References
|
||||
|
||||
<CardGrid>
|
||||
|
||||
@ -9,6 +9,7 @@ from .chip_control import ChipControl
|
||||
from .diagnostics import Diagnostics
|
||||
from .firmware_builder import FirmwareBuilder
|
||||
from .flash_manager import FlashManager
|
||||
from .idf_integration import IDFIntegration
|
||||
from .ota_manager import OTAManager
|
||||
from .partition_manager import PartitionManager
|
||||
from .product_catalog import ProductCatalog
|
||||
@ -30,6 +31,7 @@ COMPONENT_REGISTRY = {
|
||||
"qemu_manager": QemuManager,
|
||||
"product_catalog": ProductCatalog,
|
||||
"smart_backup": SmartBackupManager,
|
||||
"idf_integration": IDFIntegration,
|
||||
}
|
||||
|
||||
__all__ = [
|
||||
@ -44,5 +46,6 @@ __all__ = [
|
||||
"QemuManager",
|
||||
"ProductCatalog",
|
||||
"SmartBackupManager",
|
||||
"IDFIntegration",
|
||||
"COMPONENT_REGISTRY",
|
||||
]
|
||||
|
||||
938
src/mcesptool/components/idf_integration.py
Normal file
938
src/mcesptool/components/idf_integration.py
Normal file
@ -0,0 +1,938 @@
|
||||
"""
|
||||
ESP-IDF Integration Component
|
||||
|
||||
Wraps idf_tools.py and tools.json to provide toolchain management and
|
||||
project workflow tools via MCP. Uses a two-tier loading model:
|
||||
|
||||
Tier 1 (idf_path_found): Toolchain management -- list, check, install tools.
|
||||
Tier 2 (idf_available): Project workflows -- build, flash, monitor.
|
||||
|
||||
Tier 1 tools are always registered when the IDF directory exists with
|
||||
idf_tools.py present. Tier 2 tools are registered but check availability
|
||||
at invocation time, so they unlock after idf_tools_install without restart.
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import re
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
from fastmcp import Context, FastMCP
|
||||
|
||||
from ..config import ESPToolServerConfig
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Target name -> architecture family. Covers ESP-IDF v5.x targets.
|
||||
TARGET_ARCH: dict[str, str] = {
|
||||
"esp32": "xtensa",
|
||||
"esp32s2": "xtensa",
|
||||
"esp32s3": "xtensa",
|
||||
"esp32c2": "riscv",
|
||||
"esp32c3": "riscv",
|
||||
"esp32c5": "riscv",
|
||||
"esp32c6": "riscv",
|
||||
"esp32c61": "riscv",
|
||||
"esp32h2": "riscv",
|
||||
"esp32p4": "riscv",
|
||||
}
|
||||
|
||||
# Validation patterns for user-supplied strings
|
||||
_SAFE_TOOL_NAME = re.compile(r"^[a-zA-Z0-9][a-zA-Z0-9_\-.]+$")
|
||||
_VALID_BAUD_RATES = {9600, 57600, 115200, 230400, 460800, 921600, 1500000, 2000000}
|
||||
|
||||
|
||||
def _validate_target(target: str) -> str:
|
||||
"""Validate and return target, or raise ValueError."""
|
||||
if target not in TARGET_ARCH:
|
||||
raise ValueError(
|
||||
f"Unknown target: {target!r}. "
|
||||
f"Valid targets: {', '.join(sorted(TARGET_ARCH))}"
|
||||
)
|
||||
return target
|
||||
|
||||
|
||||
def _validate_tool_names(names: list[str]) -> list[str]:
|
||||
"""Reject tool names containing flag-like or suspicious characters."""
|
||||
validated = []
|
||||
for name in names:
|
||||
if not _SAFE_TOOL_NAME.match(name) or name.startswith("-"):
|
||||
raise ValueError(f"Invalid tool name: {name!r}")
|
||||
validated.append(name)
|
||||
return validated
|
||||
|
||||
|
||||
def _parse_tools_list(output: str) -> list[dict[str, Any]]:
|
||||
"""Parse ``idf_tools.py list`` text output into structured records.
|
||||
|
||||
The output format is roughly::
|
||||
|
||||
* xtensa-esp-elf-gdb
|
||||
- Version 14.2_20240403
|
||||
- xtensa-esp-elf-gdb-14.2_20240403-x86_64-linux-gnu.tar.gz (installed)
|
||||
"""
|
||||
tools: list[dict[str, Any]] = []
|
||||
current_tool: dict[str, Any] | None = None
|
||||
current_version: dict[str, Any] | None = None
|
||||
|
||||
for line in output.splitlines():
|
||||
stripped = line.strip()
|
||||
if not stripped:
|
||||
continue
|
||||
|
||||
# Tool header: "* tool-name"
|
||||
if stripped.startswith("* "):
|
||||
if current_tool is not None:
|
||||
if current_version:
|
||||
current_tool["versions"].append(current_version)
|
||||
tools.append(current_tool)
|
||||
current_tool = {"name": stripped[2:].strip(), "versions": []}
|
||||
current_version = None
|
||||
continue
|
||||
|
||||
if current_tool is None:
|
||||
continue
|
||||
|
||||
# Version line: "- Version X.Y.Z"
|
||||
version_match = re.match(r"^-\s+Version\s+(.+)", stripped)
|
||||
if version_match:
|
||||
if current_version:
|
||||
current_tool["versions"].append(current_version)
|
||||
current_version = {
|
||||
"version": version_match.group(1).strip(),
|
||||
"installed": False,
|
||||
"archives": [],
|
||||
}
|
||||
continue
|
||||
|
||||
# Archive line: "- filename.tar.gz (installed)" or just "- filename"
|
||||
if current_version and stripped.startswith("- "):
|
||||
archive_part = stripped[2:].strip()
|
||||
installed = "(installed)" in archive_part
|
||||
if installed:
|
||||
current_version["installed"] = True
|
||||
archive_part = archive_part.replace("(installed)", "").strip()
|
||||
current_version["archives"].append({
|
||||
"file": archive_part,
|
||||
"installed": installed,
|
||||
})
|
||||
|
||||
# Flush final tool
|
||||
if current_tool is not None:
|
||||
if current_version:
|
||||
current_tool["versions"].append(current_version)
|
||||
tools.append(current_tool)
|
||||
|
||||
return tools
|
||||
|
||||
|
||||
def _parse_tools_check(output: str) -> dict[str, Any]:
|
||||
"""Parse ``idf_tools.py check`` output.
|
||||
|
||||
Returns dict with ``installed`` and ``missing`` lists.
|
||||
"""
|
||||
installed: list[str] = []
|
||||
missing: list[str] = []
|
||||
|
||||
for line in output.splitlines():
|
||||
stripped = line.strip()
|
||||
if not stripped:
|
||||
continue
|
||||
# Lines look like: "xtensa-esp-elf 14.2.0_20241119: found" or "... not found"
|
||||
if ": found" in stripped:
|
||||
tool_name = stripped.split(":")[0].strip()
|
||||
installed.append(tool_name)
|
||||
elif "not found" in stripped.lower():
|
||||
tool_name = stripped.split(":")[0].strip()
|
||||
missing.append(tool_name)
|
||||
|
||||
return {"installed": installed, "missing": missing}
|
||||
|
||||
|
||||
def _parse_export_vars(output: str) -> dict[str, str]:
|
||||
"""Parse ``idf_tools.py export --format key-value`` output.
|
||||
|
||||
Lines are ``KEY=VALUE``. Skips comments and blank lines.
|
||||
"""
|
||||
env: dict[str, str] = {}
|
||||
for line in output.splitlines():
|
||||
line = line.strip()
|
||||
if not line or line.startswith("#"):
|
||||
continue
|
||||
if "=" in line:
|
||||
key, _, value = line.partition("=")
|
||||
env[key.strip()] = value.strip()
|
||||
return env
|
||||
|
||||
|
||||
class IDFIntegration:
|
||||
"""ESP-IDF toolchain management and project workflow component"""
|
||||
|
||||
def __init__(self, app: FastMCP, config: ESPToolServerConfig) -> None:
|
||||
self.app = app
|
||||
self.config = config
|
||||
self._tools_json_cache: dict[str, Any] | None = None
|
||||
self._tools_json_mtime: float = 0.0
|
||||
self._register_tools()
|
||||
self._register_resources()
|
||||
self._register_prompts()
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Subprocess runner
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
async def _run_idf_tools(
|
||||
self,
|
||||
args: list[str],
|
||||
timeout: float = 120.0,
|
||||
env: dict[str, str] | None = None,
|
||||
) -> dict[str, Any]:
|
||||
"""Run ``python3 $IDF_PATH/tools/idf_tools.py <args>`` as async subprocess."""
|
||||
idf_path = self.config.esp_idf_path
|
||||
if not idf_path:
|
||||
return {"success": False, "error": "ESP-IDF path not configured"}
|
||||
|
||||
script = str(idf_path / "tools" / "idf_tools.py")
|
||||
cmd = ["python3", script, *args]
|
||||
|
||||
run_env = dict(os.environ)
|
||||
run_env["IDF_PATH"] = str(idf_path)
|
||||
if env:
|
||||
run_env.update(env)
|
||||
|
||||
proc = None
|
||||
try:
|
||||
proc = await asyncio.create_subprocess_exec(
|
||||
*cmd,
|
||||
stdout=asyncio.subprocess.PIPE,
|
||||
stderr=asyncio.subprocess.PIPE,
|
||||
env=run_env,
|
||||
)
|
||||
stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=timeout)
|
||||
output = (stdout or b"").decode()
|
||||
err_output = (stderr or b"").decode()
|
||||
|
||||
if proc.returncode != 0:
|
||||
return {
|
||||
"success": False,
|
||||
"error": (err_output or output).strip()[:1000],
|
||||
}
|
||||
|
||||
return {"success": True, "output": output, "stderr": err_output}
|
||||
|
||||
except asyncio.TimeoutError:
|
||||
if proc and proc.returncode is None:
|
||||
proc.kill()
|
||||
await proc.wait()
|
||||
return {"success": False, "error": f"Timeout after {timeout}s"}
|
||||
except FileNotFoundError:
|
||||
return {"success": False, "error": f"python3 not found or {script} missing"}
|
||||
except Exception as e:
|
||||
if proc and proc.returncode is None:
|
||||
proc.kill()
|
||||
await proc.wait()
|
||||
return {"success": False, "error": str(e)}
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# tools.json loader
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
async def _load_tools_json(self) -> dict[str, Any] | None:
|
||||
"""Load $IDF_PATH/tools/tools.json with mtime-based cache invalidation."""
|
||||
idf_path = self.config.esp_idf_path
|
||||
if not idf_path:
|
||||
return None
|
||||
|
||||
tools_json_path = idf_path / "tools" / "tools.json"
|
||||
if not tools_json_path.exists():
|
||||
return None
|
||||
|
||||
try:
|
||||
current_mtime = tools_json_path.stat().st_mtime
|
||||
if (
|
||||
self._tools_json_cache is not None
|
||||
and self._tools_json_mtime == current_mtime
|
||||
):
|
||||
return self._tools_json_cache
|
||||
|
||||
loop = asyncio.get_running_loop()
|
||||
data = await loop.run_in_executor(None, tools_json_path.read_text)
|
||||
self._tools_json_cache = json.loads(data)
|
||||
self._tools_json_mtime = current_mtime
|
||||
return self._tools_json_cache
|
||||
except Exception:
|
||||
logger.warning("Failed to load tools.json", exc_info=True)
|
||||
return None
|
||||
|
||||
def _tools_for_target(self, tools_json: dict[str, Any], target: str) -> list[dict[str, Any]]:
|
||||
"""Return tools from tools.json required for a given target chip."""
|
||||
required: list[dict[str, str]] = []
|
||||
for tool in tools_json.get("tools", []):
|
||||
supported = tool.get("supported_targets", [])
|
||||
# "all" or explicit target name
|
||||
if supported == "all" or target in supported:
|
||||
latest = ""
|
||||
for ver in tool.get("versions", []):
|
||||
if ver.get("status") == "recommended":
|
||||
latest = ver.get("name", "")
|
||||
break
|
||||
required.append({
|
||||
"name": tool["name"],
|
||||
"description": tool.get("description", ""),
|
||||
"version": latest,
|
||||
"supported_targets": supported if isinstance(supported, list) else [supported],
|
||||
})
|
||||
return required
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# IDF environment builder
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
async def _build_idf_env(self) -> dict[str, str] | None:
|
||||
"""Run ``idf_tools.py export --format key-value`` and return env dict."""
|
||||
result = await self._run_idf_tools(["export", "--format", "key-value"])
|
||||
if not result["success"]:
|
||||
return None
|
||||
|
||||
exported = _parse_export_vars(result["output"])
|
||||
|
||||
# Merge with current env so PATH additions work
|
||||
env = dict(os.environ)
|
||||
for key, value in exported.items():
|
||||
if key == "PATH":
|
||||
env["PATH"] = value + os.pathsep + env.get("PATH", "")
|
||||
else:
|
||||
env[key] = value
|
||||
|
||||
env["IDF_PATH"] = str(self.config.esp_idf_path)
|
||||
return env
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# idf.py subprocess runner (Tier 2)
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
async def _run_idf_py(
|
||||
self,
|
||||
args: list[str],
|
||||
timeout: float = 300.0,
|
||||
) -> dict[str, Any]:
|
||||
"""Run ``python3 $IDF_PATH/tools/idf.py <args>`` with full IDF env.
|
||||
|
||||
Handles subprocess lifecycle: timeout cleanup, kill on cancel.
|
||||
Returns dict with success/output/error like _run_idf_tools.
|
||||
"""
|
||||
if not self.config.get_idf_available():
|
||||
return {
|
||||
"success": False,
|
||||
"error": "Full ESP-IDF environment not available",
|
||||
"hint": "Use idf_tools_check to see what's missing, "
|
||||
"then idf_tools_install to set up toolchains",
|
||||
}
|
||||
|
||||
env = await self._build_idf_env()
|
||||
if not env:
|
||||
return {"success": False, "error": "Failed to export IDF environment"}
|
||||
|
||||
idf_path = self.config.esp_idf_path
|
||||
assert idf_path is not None # guaranteed by get_idf_available()
|
||||
idf_py = str(idf_path / "tools" / "idf.py")
|
||||
cmd = ["python3", idf_py, *args]
|
||||
|
||||
proc = None
|
||||
try:
|
||||
proc = await asyncio.create_subprocess_exec(
|
||||
*cmd,
|
||||
stdout=asyncio.subprocess.PIPE,
|
||||
stderr=asyncio.subprocess.PIPE,
|
||||
env=env,
|
||||
)
|
||||
stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=timeout)
|
||||
output = (stdout or b"").decode()
|
||||
err_output = (stderr or b"").decode()
|
||||
|
||||
if proc.returncode != 0:
|
||||
return {
|
||||
"success": False,
|
||||
"error": (err_output + output).strip()[-1000:],
|
||||
}
|
||||
|
||||
return {"success": True, "output": output, "stderr": err_output}
|
||||
|
||||
except asyncio.TimeoutError:
|
||||
if proc and proc.returncode is None:
|
||||
proc.kill()
|
||||
await proc.wait()
|
||||
return {"success": False, "error": f"Timeout after {timeout}s"}
|
||||
except FileNotFoundError:
|
||||
return {"success": False, "error": f"python3 not found or {idf_py} missing"}
|
||||
except Exception as e:
|
||||
if proc and proc.returncode is None:
|
||||
proc.kill()
|
||||
await proc.wait()
|
||||
return {"success": False, "error": str(e)}
|
||||
finally:
|
||||
# Guarantee cleanup on cancellation or unexpected error
|
||||
if proc and proc.returncode is None:
|
||||
proc.kill()
|
||||
await proc.wait()
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Tool registration
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def _register_tools(self) -> None:
|
||||
self._register_tier1_tools()
|
||||
self._register_tier2_tools()
|
||||
|
||||
def _register_tier1_tools(self) -> None:
|
||||
"""Toolchain management -- always available when IDF path is found."""
|
||||
|
||||
@self.app.tool("idf_tools_list")
|
||||
async def idf_tools_list(
|
||||
context: Context,
|
||||
target: str | None = None,
|
||||
) -> dict[str, Any]:
|
||||
"""List ESP-IDF tools with install status and target support.
|
||||
|
||||
Shows every tool managed by idf_tools.py, whether each is installed,
|
||||
and which ESP targets it supports. Optionally filter by target chip.
|
||||
|
||||
Args:
|
||||
target: Filter to tools supporting this target (e.g. "esp32p4").
|
||||
Returns all tools if omitted.
|
||||
"""
|
||||
if target:
|
||||
try:
|
||||
_validate_target(target)
|
||||
except ValueError as e:
|
||||
return {"success": False, "error": str(e)}
|
||||
|
||||
# Get runtime status from idf_tools.py list
|
||||
result = await self._run_idf_tools(["list"])
|
||||
if not result["success"]:
|
||||
return result
|
||||
|
||||
parsed = _parse_tools_list(result["output"])
|
||||
|
||||
# Enrich with tools.json metadata (target support, descriptions)
|
||||
tools_json = await self._load_tools_json()
|
||||
json_index: dict[str, dict[str, Any]] = {}
|
||||
if tools_json:
|
||||
for t in tools_json.get("tools", []):
|
||||
json_index[t["name"]] = t
|
||||
|
||||
enriched: list[dict[str, Any]] = []
|
||||
for tool in parsed:
|
||||
name = tool["name"]
|
||||
meta = json_index.get(name, {})
|
||||
supported = meta.get("supported_targets", [])
|
||||
|
||||
entry = {
|
||||
"name": name,
|
||||
"description": meta.get("description", ""),
|
||||
"supported_targets": supported if isinstance(supported, list) else [supported],
|
||||
"versions": tool["versions"],
|
||||
}
|
||||
|
||||
# Filter by target if requested
|
||||
if target:
|
||||
targets = entry["supported_targets"]
|
||||
if targets != ["all"] and target not in targets:
|
||||
continue
|
||||
|
||||
enriched.append(entry)
|
||||
|
||||
return {
|
||||
"success": True,
|
||||
"idf_path": str(self.config.esp_idf_path),
|
||||
"tool_count": len(enriched),
|
||||
"tools": enriched,
|
||||
**({"filtered_by_target": target} if target else {}),
|
||||
}
|
||||
|
||||
@self.app.tool("idf_tools_check")
|
||||
async def idf_tools_check(
|
||||
context: Context,
|
||||
target: str | None = None,
|
||||
) -> dict[str, Any]:
|
||||
"""Check which ESP-IDF tools are installed vs missing.
|
||||
|
||||
Runs ``idf_tools.py check`` to verify tool installation status.
|
||||
When a target is specified, cross-references tools.json to identify
|
||||
which missing tools are needed for that specific chip.
|
||||
|
||||
Args:
|
||||
target: ESP target chip (e.g. "esp32", "esp32p4", "esp32c3").
|
||||
When set, the response highlights tools required for
|
||||
that target and provides an install command.
|
||||
"""
|
||||
if target:
|
||||
try:
|
||||
_validate_target(target)
|
||||
except ValueError as e:
|
||||
return {"success": False, "error": str(e)}
|
||||
|
||||
result = await self._run_idf_tools(["check"])
|
||||
if not result["success"]:
|
||||
return result
|
||||
|
||||
status = _parse_tools_check(result["output"])
|
||||
|
||||
response: dict[str, Any] = {
|
||||
"success": True,
|
||||
"idf_path": str(self.config.esp_idf_path),
|
||||
"installed": status["installed"],
|
||||
"missing": status["missing"],
|
||||
"installed_count": len(status["installed"]),
|
||||
"missing_count": len(status["missing"]),
|
||||
}
|
||||
|
||||
# If target specified, identify which missing tools matter
|
||||
if target:
|
||||
arch = TARGET_ARCH.get(target)
|
||||
response["target"] = target
|
||||
response["architecture"] = arch or "unknown"
|
||||
|
||||
tools_json = await self._load_tools_json()
|
||||
if tools_json:
|
||||
needed = self._tools_for_target(tools_json, target)
|
||||
needed_names = {t["name"] for t in needed}
|
||||
missing_for_target = [
|
||||
m for m in status["missing"]
|
||||
if any(m.startswith(n) for n in needed_names)
|
||||
]
|
||||
response["tools_for_target"] = needed
|
||||
response["missing_for_target"] = missing_for_target
|
||||
response["target_ready"] = len(missing_for_target) == 0
|
||||
|
||||
if missing_for_target:
|
||||
response["install_hint"] = (
|
||||
f"Run idf_tools_install with targets=['{target}'] "
|
||||
f"to install {len(missing_for_target)} missing tool(s)"
|
||||
)
|
||||
|
||||
return response
|
||||
|
||||
@self.app.tool("idf_tools_install")
|
||||
async def idf_tools_install(
|
||||
context: Context,
|
||||
targets: list[str] | None = None,
|
||||
tools: list[str] | None = None,
|
||||
) -> dict[str, Any]:
|
||||
"""Install ESP-IDF toolchains for specified targets or tools.
|
||||
|
||||
Runs ``idf_tools.py install`` to download and set up toolchains.
|
||||
Either specify target chips (resolves to required tools automatically)
|
||||
or specific tool names.
|
||||
|
||||
Args:
|
||||
targets: List of ESP target chips (e.g. ["esp32p4", "esp32c3"]).
|
||||
Installs all tools required for these targets.
|
||||
tools: List of specific tool names to install
|
||||
(e.g. ["riscv32-esp-elf"]). Overrides targets if both given.
|
||||
"""
|
||||
args = ["install"]
|
||||
|
||||
if tools:
|
||||
try:
|
||||
validated = _validate_tool_names(tools)
|
||||
except ValueError as e:
|
||||
return {"success": False, "error": str(e)}
|
||||
args.extend(validated)
|
||||
elif targets:
|
||||
try:
|
||||
for t in targets:
|
||||
_validate_target(t)
|
||||
except ValueError as e:
|
||||
return {"success": False, "error": str(e)}
|
||||
args.append(f"--targets={','.join(targets)}")
|
||||
else:
|
||||
return {
|
||||
"success": False,
|
||||
"error": "Specify either 'targets' or 'tools' to install",
|
||||
}
|
||||
|
||||
await context.info(f"Installing IDF tools: {' '.join(args[1:])}")
|
||||
|
||||
result = await self._run_idf_tools(args, timeout=600.0)
|
||||
if not result["success"]:
|
||||
return result
|
||||
|
||||
# Verify installation
|
||||
check = await self._run_idf_tools(["check"])
|
||||
check_status = _parse_tools_check(check["output"]) if check["success"] else {}
|
||||
|
||||
return {
|
||||
"success": True,
|
||||
"command": f"idf_tools.py {' '.join(args)}",
|
||||
"output_summary": result["output"][-500:] if result["output"] else "",
|
||||
"post_install_missing": check_status.get("missing", []),
|
||||
"post_install_installed_count": len(check_status.get("installed", [])),
|
||||
}
|
||||
|
||||
@self.app.tool("idf_env_info")
|
||||
async def idf_env_info(context: Context) -> dict[str, Any]:
|
||||
"""Get current ESP-IDF environment information.
|
||||
|
||||
Shows IDF_PATH, exported PATH additions, tool versions, and
|
||||
Python environment details. Useful for diagnosing build issues.
|
||||
"""
|
||||
result = await self._run_idf_tools(["export", "--format", "key-value"])
|
||||
if not result["success"]:
|
||||
return result
|
||||
|
||||
exported = _parse_export_vars(result["output"])
|
||||
|
||||
# Get IDF version
|
||||
idf_path = self.config.esp_idf_path
|
||||
version = "unknown"
|
||||
if idf_path:
|
||||
version_path = idf_path / "version.txt"
|
||||
if version_path.exists():
|
||||
version = version_path.read_text().strip()
|
||||
else:
|
||||
# Try git describe
|
||||
git_proc = None
|
||||
try:
|
||||
git_proc = await asyncio.create_subprocess_exec(
|
||||
"git", "describe", "--tags", "--always",
|
||||
cwd=str(idf_path),
|
||||
stdout=asyncio.subprocess.PIPE,
|
||||
stderr=asyncio.subprocess.PIPE,
|
||||
)
|
||||
stdout, _ = await asyncio.wait_for(git_proc.communicate(), timeout=5.0)
|
||||
if git_proc.returncode == 0:
|
||||
version = stdout.decode().strip()
|
||||
except Exception as exc:
|
||||
logger.debug(f"git describe failed: {exc}")
|
||||
finally:
|
||||
if git_proc and git_proc.returncode is None:
|
||||
git_proc.kill()
|
||||
await git_proc.wait()
|
||||
|
||||
return {
|
||||
"success": True,
|
||||
"idf_path": str(idf_path),
|
||||
"idf_version": version,
|
||||
"exported_vars": exported,
|
||||
"path_additions": [
|
||||
p for p in exported.get("PATH", "").split(os.pathsep)
|
||||
if p and ".espressif" in p
|
||||
],
|
||||
}
|
||||
|
||||
def _register_tier2_tools(self) -> None:
|
||||
"""Project workflows -- gate checked at invocation time so they
|
||||
unlock after idf_tools_install without server restart."""
|
||||
|
||||
@self.app.tool("idf_build_project")
|
||||
async def idf_build_project(
|
||||
context: Context,
|
||||
project_path: str,
|
||||
target: str = "esp32",
|
||||
clean: bool = False,
|
||||
) -> dict[str, Any]:
|
||||
"""Build an ESP-IDF project.
|
||||
|
||||
Runs ``idf.py set-target`` then ``idf.py build`` for the given
|
||||
project directory. Requires a fully configured ESP-IDF environment.
|
||||
|
||||
Args:
|
||||
project_path: Path to ESP-IDF project directory
|
||||
(must contain CMakeLists.txt).
|
||||
target: Target chip (e.g. "esp32", "esp32s3", "esp32c3").
|
||||
clean: Run ``idf.py fullclean`` before building.
|
||||
"""
|
||||
try:
|
||||
_validate_target(target)
|
||||
except ValueError as e:
|
||||
return {"success": False, "error": str(e)}
|
||||
|
||||
project = Path(project_path).expanduser().resolve()
|
||||
if not (project / "CMakeLists.txt").exists():
|
||||
return {
|
||||
"success": False,
|
||||
"error": f"No CMakeLists.txt found in {project}",
|
||||
}
|
||||
|
||||
# Optional clean
|
||||
if clean:
|
||||
await context.info("Cleaning build directory")
|
||||
clean_result = await self._run_idf_py(
|
||||
["-C", str(project), "fullclean"], timeout=60.0,
|
||||
)
|
||||
if not clean_result["success"]:
|
||||
return clean_result
|
||||
|
||||
# Set target
|
||||
await context.info(f"Setting target to {target}")
|
||||
set_result = await self._run_idf_py(
|
||||
["-C", str(project), "set-target", target], timeout=120.0,
|
||||
)
|
||||
if not set_result["success"]:
|
||||
return set_result
|
||||
|
||||
# Build
|
||||
await context.info(f"Building project at {project}")
|
||||
build_result = await self._run_idf_py(
|
||||
["-C", str(project), "build"], timeout=600.0,
|
||||
)
|
||||
if not build_result["success"]:
|
||||
return build_result
|
||||
|
||||
output = build_result.get("output", "") + build_result.get("stderr", "")
|
||||
bin_path = project / "build" / f"{project.name}.bin"
|
||||
return {
|
||||
"success": True,
|
||||
"project": str(project),
|
||||
"target": target,
|
||||
"output_summary": output[-500:],
|
||||
"binary": str(bin_path) if bin_path.exists() else None,
|
||||
}
|
||||
|
||||
@self.app.tool("idf_flash_project")
|
||||
async def idf_flash_project(
|
||||
context: Context,
|
||||
project_path: str,
|
||||
port: str,
|
||||
baud: int = 460800,
|
||||
) -> dict[str, Any]:
|
||||
"""Flash a built ESP-IDF project to a device.
|
||||
|
||||
Runs ``idf.py flash`` for a previously built project.
|
||||
Requires a fully configured ESP-IDF environment.
|
||||
|
||||
Args:
|
||||
project_path: Path to ESP-IDF project directory (must be built).
|
||||
port: Serial port or socket URI for the target device.
|
||||
baud: Flash baud rate.
|
||||
"""
|
||||
if baud not in _VALID_BAUD_RATES:
|
||||
return {
|
||||
"success": False,
|
||||
"error": f"Invalid baud rate: {baud}. "
|
||||
f"Valid rates: {sorted(_VALID_BAUD_RATES)}",
|
||||
}
|
||||
|
||||
project = Path(project_path).expanduser().resolve()
|
||||
build_dir = project / "build"
|
||||
if not build_dir.exists():
|
||||
return {
|
||||
"success": False,
|
||||
"error": f"Build directory not found at {build_dir}. "
|
||||
"Run idf_build_project first.",
|
||||
}
|
||||
|
||||
await context.info(f"Flashing {project.name} to {port}")
|
||||
result = await self._run_idf_py(
|
||||
["-C", str(project), "-p", port, "-b", str(baud), "flash"],
|
||||
timeout=300.0,
|
||||
)
|
||||
if not result["success"]:
|
||||
return result
|
||||
|
||||
output = result.get("output", "") + result.get("stderr", "")
|
||||
return {
|
||||
"success": True,
|
||||
"project": str(project),
|
||||
"port": port,
|
||||
"output_summary": output[-500:],
|
||||
}
|
||||
|
||||
@self.app.tool("idf_monitor")
|
||||
async def idf_monitor(
|
||||
context: Context,
|
||||
port: str,
|
||||
project_path: str | None = None,
|
||||
duration: int = 10,
|
||||
) -> dict[str, Any]:
|
||||
"""Capture serial output from an ESP device using IDF monitor.
|
||||
|
||||
Runs ``idf.py monitor`` for a limited duration. If a project
|
||||
path is given, ELF-based address decoding is enabled for crash
|
||||
backtraces.
|
||||
|
||||
Args:
|
||||
port: Serial port or socket URI.
|
||||
project_path: Optional ESP-IDF project path for ELF decoding.
|
||||
duration: Capture duration in seconds (max 60, default 10).
|
||||
"""
|
||||
if not self.config.get_idf_available():
|
||||
return {
|
||||
"success": False,
|
||||
"error": "Full ESP-IDF environment not available",
|
||||
"hint": "Use idf_tools_check to see what's missing, "
|
||||
"then idf_tools_install to set up toolchains",
|
||||
}
|
||||
|
||||
duration = min(max(duration, 1), 60)
|
||||
|
||||
env = await self._build_idf_env()
|
||||
if not env:
|
||||
return {"success": False, "error": "Failed to export IDF environment"}
|
||||
|
||||
idf_path = self.config.esp_idf_path
|
||||
assert idf_path is not None # guaranteed by get_idf_available()
|
||||
idf_py = str(idf_path / "tools" / "idf.py")
|
||||
cmd = ["python3", idf_py, "-p", port, "monitor", "--no-reset"]
|
||||
if project_path:
|
||||
project = Path(project_path).expanduser().resolve()
|
||||
cmd = ["python3", idf_py, "-C", str(project), "-p", port, "monitor", "--no-reset"]
|
||||
|
||||
await context.info(f"Monitoring {port} for {duration}s")
|
||||
proc = None
|
||||
try:
|
||||
proc = await asyncio.create_subprocess_exec(
|
||||
*cmd, stdout=asyncio.subprocess.PIPE,
|
||||
stderr=asyncio.subprocess.PIPE, env=env,
|
||||
)
|
||||
stdout_bytes, _ = await asyncio.wait_for(
|
||||
proc.communicate(), timeout=float(duration)
|
||||
)
|
||||
output = (stdout_bytes or b"").decode(errors="replace")
|
||||
except asyncio.TimeoutError:
|
||||
if proc and proc.returncode is None:
|
||||
proc.terminate()
|
||||
try:
|
||||
stdout_bytes, _ = await asyncio.wait_for(
|
||||
proc.communicate(), timeout=5.0,
|
||||
)
|
||||
output = (stdout_bytes or b"").decode(errors="replace")
|
||||
except asyncio.TimeoutError:
|
||||
proc.kill()
|
||||
await proc.wait()
|
||||
output = ""
|
||||
else:
|
||||
output = ""
|
||||
finally:
|
||||
if proc and proc.returncode is None:
|
||||
proc.kill()
|
||||
await proc.wait()
|
||||
|
||||
return {
|
||||
"success": True,
|
||||
"port": port,
|
||||
"duration": duration,
|
||||
"output": output[-5000:],
|
||||
"lines": len(output.splitlines()),
|
||||
}
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Resources
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def _register_resources(self) -> None:
|
||||
|
||||
@self.app.resource("esp://idf/status")
|
||||
async def idf_status() -> dict[str, Any]:
|
||||
"""ESP-IDF installation status: version, installed/missing tools, target readiness."""
|
||||
idf_path = self.config.esp_idf_path
|
||||
if not idf_path:
|
||||
return {"available": False, "error": "No ESP-IDF path configured"}
|
||||
|
||||
# Version
|
||||
version = "unknown"
|
||||
version_path = idf_path / "version.txt"
|
||||
if version_path.exists():
|
||||
version = version_path.read_text().strip()
|
||||
|
||||
# Tool check
|
||||
result = await self._run_idf_tools(["check"])
|
||||
tools_status = _parse_tools_check(result["output"]) if result["success"] else {}
|
||||
|
||||
# Per-target readiness
|
||||
tools_json = await self._load_tools_json()
|
||||
target_readiness: dict[str, bool] = {}
|
||||
if tools_json:
|
||||
missing_set = set(tools_status.get("missing", []))
|
||||
for target in TARGET_ARCH:
|
||||
needed = self._tools_for_target(tools_json, target)
|
||||
needed_names = {t["name"] for t in needed}
|
||||
target_missing = [
|
||||
m for m in missing_set
|
||||
if any(m.startswith(n) for n in needed_names)
|
||||
]
|
||||
target_readiness[target] = len(target_missing) == 0
|
||||
|
||||
return {
|
||||
"available": True,
|
||||
"idf_path": str(idf_path),
|
||||
"idf_version": version,
|
||||
"idf_path_found": self.config.get_idf_path_found(),
|
||||
"idf_fully_available": self.config.get_idf_available(),
|
||||
"installed_tools": len(tools_status.get("installed", [])),
|
||||
"missing_tools": len(tools_status.get("missing", [])),
|
||||
"missing_tool_names": tools_status.get("missing", []),
|
||||
"target_readiness": target_readiness,
|
||||
}
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Prompts
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def _register_prompts(self) -> None:
|
||||
|
||||
@self.app.prompt("idf_setup_target")
|
||||
async def idf_setup_target(target: str) -> str:
|
||||
"""Generate a guided setup showing what's needed for a specific ESP target.
|
||||
|
||||
Args:
|
||||
target: ESP target chip (e.g. "esp32p4", "esp32c3").
|
||||
"""
|
||||
arch = TARGET_ARCH.get(target, "unknown")
|
||||
lines = [
|
||||
f"# ESP-IDF Setup for {target} ({arch} architecture)\n",
|
||||
]
|
||||
|
||||
idf_path = self.config.esp_idf_path
|
||||
if not idf_path:
|
||||
lines.append("ESP-IDF is not installed. Install it first:")
|
||||
lines.append(" https://docs.espressif.com/projects/esp-idf/en/stable/get-started/")
|
||||
return "\n".join(lines)
|
||||
|
||||
lines.append(f"ESP-IDF path: `{idf_path}`\n")
|
||||
|
||||
# Check tools
|
||||
result = await self._run_idf_tools(["check"])
|
||||
if result["success"]:
|
||||
status = _parse_tools_check(result["output"])
|
||||
tools_json = await self._load_tools_json()
|
||||
|
||||
if tools_json:
|
||||
needed = self._tools_for_target(tools_json, target)
|
||||
needed_names = {t["name"] for t in needed}
|
||||
missing = [
|
||||
m for m in status["missing"]
|
||||
if any(m.startswith(n) for n in needed_names)
|
||||
]
|
||||
|
||||
lines.append(f"## Tools required for {target}\n")
|
||||
for t in needed:
|
||||
name = t["name"]
|
||||
is_missing = any(m.startswith(name) for m in missing)
|
||||
marker = "MISSING" if is_missing else "installed"
|
||||
lines.append(f"- **{name}** v{t['version']} [{marker}]")
|
||||
|
||||
if missing:
|
||||
lines.append("\n## Install missing tools\n")
|
||||
lines.append(
|
||||
f"Use the `idf_tools_install` tool with "
|
||||
f"`targets=[\"{target}\"]` to install "
|
||||
f"{len(missing)} missing tool(s).\n"
|
||||
)
|
||||
lines.append(
|
||||
f"Or from the command line:\n"
|
||||
f"```\n"
|
||||
f"cd {idf_path}\n"
|
||||
f"python3 tools/idf_tools.py install --targets={target}\n"
|
||||
f"```"
|
||||
)
|
||||
else:
|
||||
lines.append(f"\nAll tools for {target} are installed. Ready to build.")
|
||||
else:
|
||||
lines.append(f"Could not check tools: {result.get('error', 'unknown error')}")
|
||||
|
||||
return "\n".join(lines)
|
||||
@ -135,7 +135,7 @@ class ESPToolServerConfig:
|
||||
]
|
||||
|
||||
for path in common_paths:
|
||||
if path.exists() and (path / "idf.py").exists():
|
||||
if path.exists() and (path / "tools" / "idf.py").exists():
|
||||
self.esp_idf_path = path
|
||||
logger.info(f"Auto-detected ESP-IDF at: {path}")
|
||||
break
|
||||
@ -269,12 +269,18 @@ class ESPToolServerConfig:
|
||||
default_dir.mkdir(exist_ok=True)
|
||||
return default_dir
|
||||
|
||||
def get_idf_path_found(self) -> bool:
|
||||
"""Check if ESP-IDF directory exists with idf_tools.py (may lack toolchains)"""
|
||||
if not self.esp_idf_path:
|
||||
return False
|
||||
return (self.esp_idf_path / "tools" / "idf_tools.py").exists()
|
||||
|
||||
def get_idf_available(self) -> bool:
|
||||
"""Check if ESP-IDF is available"""
|
||||
"""Check if ESP-IDF is fully available (directory + idf.py entry point)"""
|
||||
if not self.esp_idf_path:
|
||||
return False
|
||||
|
||||
return self.esp_idf_path.exists() and (self.esp_idf_path / "idf.py").exists()
|
||||
return self.esp_idf_path.exists() and (self.esp_idf_path / "tools" / "idf.py").exists()
|
||||
|
||||
def get_qemu_available(self) -> bool:
|
||||
"""Check if at least one QEMU binary is available"""
|
||||
@ -310,6 +316,7 @@ class ESPToolServerConfig:
|
||||
"enable_stub_flasher": self.enable_stub_flasher,
|
||||
"max_concurrent_operations": self.max_concurrent_operations,
|
||||
"production_mode": self.production_mode,
|
||||
"idf_path_found": self.get_idf_path_found(),
|
||||
"idf_available": self.get_idf_available(),
|
||||
"qemu_available": self.get_qemu_available(),
|
||||
"qemu_xtensa_path": self.qemu_xtensa_path,
|
||||
|
||||
@ -20,6 +20,7 @@ from .components import (
|
||||
Diagnostics,
|
||||
FirmwareBuilder,
|
||||
FlashManager,
|
||||
IDFIntegration,
|
||||
OTAManager,
|
||||
PartitionManager,
|
||||
ProductCatalog,
|
||||
@ -92,12 +93,13 @@ class ESPToolServer:
|
||||
self.components["qemu_manager"] = QemuManager(self.app, self.config)
|
||||
logger.info("✅ QEMU emulation enabled")
|
||||
|
||||
# ESP-IDF integration (if available)
|
||||
if self.config.get_idf_available():
|
||||
from .components.idf_integration import IDFIntegration
|
||||
|
||||
# ESP-IDF integration (if IDF directory found with idf_tools.py)
|
||||
if self.config.get_idf_path_found():
|
||||
self.components["idf_integration"] = IDFIntegration(self.app, self.config)
|
||||
logger.info("✅ ESP-IDF integration enabled")
|
||||
if self.config.get_idf_available():
|
||||
logger.info("✅ ESP-IDF integration enabled (full environment)")
|
||||
else:
|
||||
logger.info("✅ ESP-IDF toolchain management enabled (project workflows pending)")
|
||||
|
||||
# Cross-wire: let ChipControl see QEMU instances for scan integration
|
||||
if "qemu_manager" in self.components:
|
||||
@ -139,6 +141,7 @@ class ESPToolServer:
|
||||
"security_features": True,
|
||||
"ota_updates": True,
|
||||
"factory_programming": True,
|
||||
"idf_toolchain_management": self.config.get_idf_path_found(),
|
||||
"host_applications": self.config.get_idf_available(),
|
||||
"qemu_emulation": self.config.get_qemu_available(),
|
||||
},
|
||||
@ -203,9 +206,16 @@ class ESPToolServer:
|
||||
"esp_qemu_flash",
|
||||
]
|
||||
|
||||
if self.config.get_idf_path_found():
|
||||
tool_categories["idf_tools"] = [
|
||||
"idf_tools_list",
|
||||
"idf_tools_check",
|
||||
"idf_tools_install",
|
||||
"idf_env_info",
|
||||
]
|
||||
|
||||
if self.config.get_idf_available():
|
||||
tool_categories["esp_idf"] = [
|
||||
"idf_create_host_project",
|
||||
"idf_build_project",
|
||||
"idf_flash_project",
|
||||
"idf_monitor",
|
||||
@ -336,6 +346,7 @@ class ESPToolServer:
|
||||
"performance_profiling",
|
||||
"diagnostic_reports",
|
||||
],
|
||||
"idf_path_found": self.config.get_idf_path_found(),
|
||||
"esp_idf_integration": self.config.get_idf_available(),
|
||||
"host_applications": self.config.get_idf_available(),
|
||||
"qemu_emulation": self.config.get_qemu_available(),
|
||||
@ -372,8 +383,11 @@ class ESPToolServer:
|
||||
if self.config.get_qemu_available():
|
||||
logger.info("🖥️ QEMU emulation available")
|
||||
|
||||
if self.config.get_idf_available():
|
||||
logger.info("🏗️ ESP-IDF integration available")
|
||||
if self.config.get_idf_path_found():
|
||||
if self.config.get_idf_available():
|
||||
logger.info("🏗️ ESP-IDF integration available (full environment)")
|
||||
else:
|
||||
logger.info("🔧 ESP-IDF found (toolchain management only)")
|
||||
|
||||
if self.config.production_mode:
|
||||
logger.info("🏭 Running in production mode")
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user