""" Bill of Materials (BOM) processing tools for KiCad projects. """ import csv import json import os from typing import Any from fastmcp import FastMCP import pandas as pd from kicad_mcp.utils.file_utils import get_project_files def register_bom_tools(mcp: FastMCP) -> None: """Register BOM-related tools with the MCP server. Args: mcp: The FastMCP server instance """ @mcp.tool() def analyze_bom(project_path: str) -> dict[str, Any]: """Analyze a KiCad project's Bill of Materials. This tool will look for BOM files related to a KiCad project and provide analysis including component counts, categories, and cost estimates if available. Args: project_path: Path to the KiCad project file (.kicad_pro) ctx: MCP context for progress reporting Returns: Dictionary with BOM analysis results """ print(f"Analyzing BOM for project: {project_path}") if not os.path.exists(project_path): print(f"Project not found: {project_path}") return {"success": False, "error": f"Project not found: {project_path}"} # Report progress # Get all project files files = get_project_files(project_path) # Look for BOM files bom_files = {} for file_type, file_path in files.items(): if "bom" in file_type.lower() or file_path.lower().endswith(".csv"): bom_files[file_type] = file_path print(f"Found potential BOM file: {file_path}") if not bom_files: print("No BOM files found for project") return { "success": False, "error": "No BOM files found. Export a BOM from KiCad first.", "project_path": project_path, } # Analyze each BOM file results = { "success": True, "project_path": project_path, "bom_files": {}, "component_summary": {}, } total_unique_components = 0 total_components = 0 for file_type, file_path in bom_files.items(): try: # Parse the BOM file bom_data, format_info = parse_bom_file(file_path) if not bom_data or len(bom_data) == 0: print(f"Failed to parse BOM file: {file_path}") continue # Analyze the BOM data analysis = analyze_bom_data(bom_data, format_info) # Add to results results["bom_files"][file_type] = { "path": file_path, "format": format_info, "analysis": analysis, } # Update totals total_unique_components += analysis["unique_component_count"] total_components += analysis["total_component_count"] print(f"Successfully analyzed BOM file: {file_path}") except Exception as e: print(f"Error analyzing BOM file {file_path}: {str(e)}", exc_info=True) results["bom_files"][file_type] = {"path": file_path, "error": str(e)} # Generate overall component summary if total_components > 0: results["component_summary"] = { "total_unique_components": total_unique_components, "total_components": total_components, } # Calculate component categories across all BOMs all_categories = {} for file_type, file_info in results["bom_files"].items(): if "analysis" in file_info and "categories" in file_info["analysis"]: for category, count in file_info["analysis"]["categories"].items(): if category not in all_categories: all_categories[category] = 0 all_categories[category] += count results["component_summary"]["categories"] = all_categories # Calculate total cost if available total_cost = 0.0 cost_available = False for file_type, file_info in results["bom_files"].items(): if "analysis" in file_info and "total_cost" in file_info["analysis"]: if file_info["analysis"]["total_cost"] > 0: total_cost += file_info["analysis"]["total_cost"] cost_available = True if cost_available: results["component_summary"]["total_cost"] = round(total_cost, 2) currency = next( ( file_info["analysis"].get("currency", "USD") for file_type, file_info in results["bom_files"].items() if "analysis" in file_info and "currency" in file_info["analysis"] ), "USD", ) results["component_summary"]["currency"] = currency return results @mcp.tool() def export_bom_csv(project_path: str) -> dict[str, Any]: """Export a Bill of Materials for a KiCad project. This tool attempts to generate a CSV BOM file for a KiCad project. It requires KiCad to be installed with the appropriate command-line tools. Args: project_path: Path to the KiCad project file (.kicad_pro) ctx: MCP context for progress reporting Returns: Dictionary with export results """ print(f"Exporting BOM for project: {project_path}") if not os.path.exists(project_path): print(f"Project not found: {project_path}") return {"success": False, "error": f"Project not found: {project_path}"} # For now, disable Python modules and use CLI only kicad_modules_available = False # Report progress # Get all project files files = get_project_files(project_path) # We need the schematic file to generate a BOM if "schematic" not in files: print("Schematic file not found in project") return {"success": False, "error": "Schematic file not found"} schematic_file = files["schematic"] project_dir = os.path.dirname(project_path) project_name = os.path.basename(project_path)[:-10] # Remove .kicad_pro extension # Try to export BOM # This will depend on KiCad's command-line tools or Python modules export_result = {"success": False} if kicad_modules_available: try: # Try to use KiCad Python modules export_result = {"success": False, "error": "Python method disabled"} except Exception as e: print(f"Error exporting BOM with Python modules: {str(e)}", exc_info=True) export_result = {"success": False, "error": str(e)} # If Python method failed, try command-line method if not export_result.get("success", False): try: export_result = {"success": False, "error": "CLI method needs sync implementation"} except Exception as e: print(f"Error exporting BOM with CLI: {str(e)}", exc_info=True) export_result = {"success": False, "error": str(e)} if export_result.get("success", False): print(f"BOM exported successfully to {export_result.get('output_file', 'unknown location')}") else: print(f"Failed to export BOM: {export_result.get('error', 'Unknown error')}") return export_result # Helper functions for BOM processing def parse_bom_file(file_path: str) -> tuple[list[dict[str, Any]], dict[str, Any]]: """Parse a BOM file and detect its format. Args: file_path: Path to the BOM file Returns: Tuple containing: - List of component dictionaries - Dictionary with format information """ print(f"Parsing BOM file: {file_path}") # Check file extension _, ext = os.path.splitext(file_path) ext = ext.lower() # Dictionary to store format detection info format_info = {"file_type": ext, "detected_format": "unknown", "header_fields": []} # Empty list to store component data components = [] try: if ext == ".csv": # Try to parse as CSV with open(file_path, encoding="utf-8-sig") as f: # Read a few lines to analyze the format sample = "".join([f.readline() for _ in range(10)]) f.seek(0) # Reset file pointer # Try to detect the delimiter if "," in sample: delimiter = "," elif ";" in sample: delimiter = ";" elif "\t" in sample: delimiter = "\t" else: delimiter = "," # Default format_info["delimiter"] = delimiter # Read CSV reader = csv.DictReader(f, delimiter=delimiter) format_info["header_fields"] = reader.fieldnames if reader.fieldnames else [] # Detect BOM format based on header fields header_str = ",".join(format_info["header_fields"]).lower() if "reference" in header_str and "value" in header_str: format_info["detected_format"] = "kicad" elif "designator" in header_str: format_info["detected_format"] = "altium" elif "part number" in header_str or "manufacturer part" in header_str: format_info["detected_format"] = "generic" # Read components for row in reader: components.append(dict(row)) elif ext == ".xml": # Basic XML parsing with security protection from defusedxml.ElementTree import parse as safe_parse tree = safe_parse(file_path) root = tree.getroot() format_info["detected_format"] = "xml" # Try to extract components based on common XML BOM formats component_elements = root.findall(".//component") or root.findall(".//Component") if component_elements: for elem in component_elements: component = {} for attr in elem.attrib: component[attr] = elem.attrib[attr] for child in elem: component[child.tag] = child.text components.append(component) elif ext == ".json": # Parse JSON with open(file_path) as f: data = json.load(f) format_info["detected_format"] = "json" # Try to find components array in common JSON formats if isinstance(data, list): components = data elif "components" in data: components = data["components"] elif "parts" in data: components = data["parts"] else: # Unknown format, try generic CSV parsing as fallback try: with open(file_path, encoding="utf-8-sig") as f: reader = csv.DictReader(f) format_info["header_fields"] = reader.fieldnames if reader.fieldnames else [] format_info["detected_format"] = "unknown_csv" for row in reader: components.append(dict(row)) except: print(f"Failed to parse unknown file format: {file_path}") return [], {"detected_format": "unsupported"} except Exception as e: print(f"Error parsing BOM file: {str(e)}", exc_info=True) return [], {"error": str(e)} # Check if we actually got components if not components: print(f"No components found in BOM file: {file_path}") else: print(f"Successfully parsed {len(components)} components from {file_path}") # Add a sample of the fields found if components: format_info["sample_fields"] = list(components[0].keys()) return components, format_info def analyze_bom_data( components: list[dict[str, Any]], format_info: dict[str, Any] ) -> dict[str, Any]: """Analyze component data from a BOM file. Args: components: List of component dictionaries format_info: Dictionary with format information Returns: Dictionary with analysis results """ print(f"Analyzing {len(components)} components") # Initialize results results = { "unique_component_count": 0, "total_component_count": 0, "categories": {}, "has_cost_data": False, } if not components: return results # Try to convert to pandas DataFrame for easier analysis try: df = pd.DataFrame(components) # Clean up column names df.columns = [str(col).strip().lower() for col in df.columns] # Try to identify key columns based on format ref_col = None value_col = None quantity_col = None footprint_col = None cost_col = None category_col = None # Check for reference designator column for possible_col in [ "reference", "designator", "references", "designators", "refdes", "ref", ]: if possible_col in df.columns: ref_col = possible_col break # Check for value column for possible_col in ["value", "component", "comp", "part", "component value", "comp value"]: if possible_col in df.columns: value_col = possible_col break # Check for quantity column for possible_col in ["quantity", "qty", "count", "amount"]: if possible_col in df.columns: quantity_col = possible_col break # Check for footprint column for possible_col in ["footprint", "package", "pattern", "pcb footprint"]: if possible_col in df.columns: footprint_col = possible_col break # Check for cost column for possible_col in ["cost", "price", "unit price", "unit cost", "cost each"]: if possible_col in df.columns: cost_col = possible_col break # Check for category column for possible_col in ["category", "type", "group", "component type", "lib"]: if possible_col in df.columns: category_col = possible_col break # Count total components if quantity_col: # Try to convert quantity to numeric df[quantity_col] = pd.to_numeric(df[quantity_col], errors="coerce").fillna(1) results["total_component_count"] = int(df[quantity_col].sum()) else: # If no quantity column, assume each row is one component results["total_component_count"] = len(df) # Count unique components results["unique_component_count"] = len(df) # Calculate categories if category_col: # Use provided category column categories = df[category_col].value_counts().to_dict() results["categories"] = {str(k): int(v) for k, v in categories.items()} elif footprint_col: # Use footprint as category categories = df[footprint_col].value_counts().to_dict() results["categories"] = {str(k): int(v) for k, v in categories.items()} elif ref_col: # Try to extract categories from reference designators (R=resistor, C=capacitor, etc.) def extract_prefix(ref): if isinstance(ref, str): import re match = re.match(r"^([A-Za-z]+)", ref) if match: return match.group(1) return "Other" if isinstance(df[ref_col].iloc[0], str) and "," in df[ref_col].iloc[0]: # Multiple references in one cell all_refs = [] for refs in df[ref_col]: all_refs.extend([r.strip() for r in refs.split(",")]) categories = {} for ref in all_refs: prefix = extract_prefix(ref) categories[prefix] = categories.get(prefix, 0) + 1 results["categories"] = categories else: # Single reference per row categories = df[ref_col].apply(extract_prefix).value_counts().to_dict() results["categories"] = {str(k): int(v) for k, v in categories.items()} # Map common reference prefixes to component types category_mapping = { "R": "Resistors", "C": "Capacitors", "L": "Inductors", "D": "Diodes", "Q": "Transistors", "U": "ICs", "SW": "Switches", "J": "Connectors", "K": "Relays", "Y": "Crystals/Oscillators", "F": "Fuses", "T": "Transformers", } mapped_categories = {} for cat, count in results["categories"].items(): if cat in category_mapping: mapped_name = category_mapping[cat] mapped_categories[mapped_name] = mapped_categories.get(mapped_name, 0) + count else: mapped_categories[cat] = count results["categories"] = mapped_categories # Calculate cost if available if cost_col: try: # Try to extract numeric values from cost field df[cost_col] = df[cost_col].astype(str).str.replace("$", "").str.replace(",", "") df[cost_col] = pd.to_numeric(df[cost_col], errors="coerce") # Remove NaN values df_with_cost = df.dropna(subset=[cost_col]) if not df_with_cost.empty: results["has_cost_data"] = True if quantity_col: total_cost = (df_with_cost[cost_col] * df_with_cost[quantity_col]).sum() else: total_cost = df_with_cost[cost_col].sum() results["total_cost"] = round(float(total_cost), 2) # Try to determine currency # Check first row that has cost for currency symbols for _, row in df.iterrows(): cost_str = str(row.get(cost_col, "")) if "$" in cost_str: results["currency"] = "USD" break elif "€" in cost_str: results["currency"] = "EUR" break elif "£" in cost_str: results["currency"] = "GBP" break if "currency" not in results: results["currency"] = "USD" # Default except: print("Failed to parse cost data") # Add extra insights if ref_col and value_col: # Check for common components by value value_counts = df[value_col].value_counts() most_common = value_counts.head(5).to_dict() results["most_common_values"] = {str(k): int(v) for k, v in most_common.items()} except Exception as e: print(f"Error analyzing BOM data: {str(e)}", exc_info=True) # Fallback to basic analysis results["unique_component_count"] = len(components) results["total_component_count"] = len(components) return results async def export_bom_with_python( schematic_file: str, output_dir: str, project_name: str ) -> dict[str, Any]: """Export a BOM using KiCad Python modules. Args: schematic_file: Path to the schematic file output_dir: Directory to save the BOM project_name: Name of the project ctx: MCP context for progress reporting Returns: Dictionary with export results """ print(f"Exporting BOM for schematic: {schematic_file}") try: # Try to import KiCad Python modules # This is a placeholder since exporting BOMs from schematic files # is complex and KiCad's API for this is not well-documented import kicad import kicad.pcbnew # For now, return a message indicating this method is not implemented yet print("BOM export with Python modules not fully implemented") return { "success": False, "error": "BOM export using Python modules is not fully implemented yet. Try using the command-line method.", "schematic_file": schematic_file, } except ImportError: print("Failed to import KiCad Python modules") return { "success": False, "error": "Failed to import KiCad Python modules", "schematic_file": schematic_file, } async def export_bom_with_cli( schematic_file: str, output_dir: str, project_name: str ) -> dict[str, Any]: """Export a BOM using KiCad command-line tools. Args: schematic_file: Path to the schematic file output_dir: Directory to save the BOM project_name: Name of the project ctx: MCP context for progress reporting Returns: Dictionary with export results """ import platform import subprocess system = platform.system() print(f"Exporting BOM using CLI tools on {system}") # Output file path output_file = os.path.join(output_dir, f"{project_name}_bom.csv") # Define the command based on operating system if system == "Darwin": # macOS from kicad_mcp.config import KICAD_APP_PATH # Path to KiCad command-line tools on macOS kicad_cli = os.path.join(KICAD_APP_PATH, "Contents/MacOS/kicad-cli") if not os.path.exists(kicad_cli): return { "success": False, "error": f"KiCad CLI tool not found at {kicad_cli}", "schematic_file": schematic_file, } # Command to generate BOM cmd = [kicad_cli, "sch", "export", "bom", "--output", output_file, schematic_file] elif system == "Windows": from kicad_mcp.config import KICAD_APP_PATH # Path to KiCad command-line tools on Windows kicad_cli = os.path.join(KICAD_APP_PATH, "bin", "kicad-cli.exe") if not os.path.exists(kicad_cli): return { "success": False, "error": f"KiCad CLI tool not found at {kicad_cli}", "schematic_file": schematic_file, } # Command to generate BOM cmd = [kicad_cli, "sch", "export", "bom", "--output", output_file, schematic_file] elif system == "Linux": # Assume kicad-cli is in the PATH kicad_cli = "kicad-cli" # Command to generate BOM cmd = [kicad_cli, "sch", "export", "bom", "--output", output_file, schematic_file] else: return { "success": False, "error": f"Unsupported operating system: {system}", "schematic_file": schematic_file, } try: print(f"Running command: {' '.join(cmd)}") # Run the command process = subprocess.run(cmd, capture_output=True, text=True, timeout=30) # Check if the command was successful if process.returncode != 0: print(f"BOM export command failed with code {process.returncode}") print(f"Error output: {process.stderr}") return { "success": False, "error": f"BOM export command failed: {process.stderr}", "schematic_file": schematic_file, "command": " ".join(cmd), } # Check if the output file was created if not os.path.exists(output_file): return { "success": False, "error": "BOM file was not created", "schematic_file": schematic_file, "output_file": output_file, } # Read the first few lines of the BOM to verify it's valid with open(output_file) as f: bom_content = f.read(1024) # Read first 1KB if len(bom_content.strip()) == 0: return { "success": False, "error": "Generated BOM file is empty", "schematic_file": schematic_file, "output_file": output_file, } return { "success": True, "schematic_file": schematic_file, "output_file": output_file, "file_size": os.path.getsize(output_file), "message": "BOM exported successfully", } except subprocess.TimeoutExpired: print("BOM export command timed out after 30 seconds") return { "success": False, "error": "BOM export command timed out after 30 seconds", "schematic_file": schematic_file, } except Exception as e: print(f"Error exporting BOM: {str(e)}", exc_info=True) return { "success": False, "error": f"Error exporting BOM: {str(e)}", "schematic_file": schematic_file, }