""" Automated Routing Tools for KiCad MCP Server Provides MCP tools for automated PCB routing using FreeRouting integration and KiCad IPC API for real-time routing operations and optimization. """ import logging from typing import Any from fastmcp import FastMCP from kicad_mcp.utils.file_utils import get_project_files from kicad_mcp.utils.freerouting_engine import FreeRoutingEngine, check_routing_prerequisites from kicad_mcp.utils.ipc_client import ( check_kicad_availability, kicad_ipc_session, ) logger = logging.getLogger(__name__) def register_routing_tools(mcp: FastMCP) -> None: """Register automated routing tools with the MCP server.""" @mcp.tool() def check_routing_capability() -> dict[str, Any]: """ Check if automated routing is available and working. Verifies that all components needed for automated routing are installed and properly configured, including KiCad IPC API, FreeRouting, and KiCad CLI. Returns: Dictionary with capability status and component details """ try: status = check_routing_prerequisites() return { "success": True, "routing_available": status["overall_ready"], "message": status["message"], "component_status": status["components"], "capabilities": { "automated_routing": status["overall_ready"], "interactive_placement": status["components"].get("kicad_ipc", {}).get("available", False), "optimization": status["overall_ready"], "real_time_updates": status["components"].get("kicad_ipc", {}).get("available", False) } } except Exception as e: return { "success": False, "error": str(e), "routing_available": False } @mcp.tool() def route_pcb_automatically( project_path: str, routing_strategy: str = "balanced", preserve_existing: bool = False, optimization_level: str = "standard" ) -> dict[str, Any]: """ Perform automated PCB routing using FreeRouting. Takes a KiCad PCB with placed components and automatically routes all connections using the FreeRouting autorouter with optimized parameters. Args: project_path: Path to KiCad project file (.kicad_pro) routing_strategy: Routing approach ("conservative", "balanced", "aggressive") preserve_existing: Whether to preserve existing routing optimization_level: Post-routing optimization ("none", "standard", "aggressive") Returns: Dictionary with routing results and statistics Examples: route_pcb_automatically("/path/to/project.kicad_pro") route_pcb_automatically("/path/to/project.kicad_pro", "aggressive", optimization_level="aggressive") """ try: # Get project files files = get_project_files(project_path) if "pcb" not in files: return { "success": False, "error": "PCB file not found in project" } board_path = files["pcb"] # Configure routing parameters based on strategy routing_configs = { "conservative": { "via_costs": 30, "start_ripup_costs": 50, "max_iterations": 500, "automatic_neckdown": False, "postroute_optimization": optimization_level != "none" }, "balanced": { "via_costs": 50, "start_ripup_costs": 100, "max_iterations": 1000, "automatic_neckdown": True, "postroute_optimization": optimization_level != "none" }, "aggressive": { "via_costs": 80, "start_ripup_costs": 200, "max_iterations": 2000, "automatic_neckdown": True, "postroute_optimization": True } } config = routing_configs.get(routing_strategy, routing_configs["balanced"]) # Add optimization settings if optimization_level == "aggressive": config.update({ "improvement_threshold": 0.005, # More aggressive optimization "max_iterations": config["max_iterations"] * 2 }) # Initialize FreeRouting engine engine = FreeRoutingEngine() # Check if FreeRouting is available availability = engine.check_freerouting_availability() if not availability["available"]: return { "success": False, "error": f"FreeRouting not available: {availability['message']}", "routing_strategy": routing_strategy } # Perform automated routing result = engine.route_board_complete( board_path, routing_config=config, preserve_existing=preserve_existing ) # Add strategy info to result result.update({ "routing_strategy": routing_strategy, "optimization_level": optimization_level, "project_path": project_path, "board_path": board_path }) return result except Exception as e: logger.error(f"Error in automated routing: {e}") return { "success": False, "error": str(e), "project_path": project_path, "routing_strategy": routing_strategy } @mcp.tool() def optimize_component_placement( project_path: str, optimization_goals: list[str] = None, placement_strategy: str = "thermal_aware" ) -> dict[str, Any]: """ Optimize component placement for better routing and performance. Uses KiCad IPC API to analyze and optimize component placement based on thermal, signal integrity, and routing efficiency considerations. Args: project_path: Path to KiCad project file (.kicad_pro) optimization_goals: List of goals ("thermal", "signal_integrity", "routing_density", "manufacturability") placement_strategy: Strategy for placement ("thermal_aware", "signal_integrity", "compact", "spread") Returns: Dictionary with placement optimization results """ try: if not optimization_goals: optimization_goals = ["thermal", "signal_integrity", "routing_density"] # Get project files files = get_project_files(project_path) if "pcb" not in files: return { "success": False, "error": "PCB file not found in project" } board_path = files["pcb"] # Check KiCad IPC availability ipc_status = check_kicad_availability() if not ipc_status["available"]: return { "success": False, "error": f"KiCad IPC not available: {ipc_status['message']}" } with kicad_ipc_session(board_path=board_path) as client: # Get current component placement footprints = client.get_footprints() board_stats = client.get_board_statistics() # Analyze current placement placement_analysis = _analyze_component_placement( footprints, optimization_goals, placement_strategy ) # Generate optimization suggestions optimizations = _generate_placement_optimizations( footprints, placement_analysis, optimization_goals ) # Apply optimizations if any are found applied_changes = [] if optimizations.get("component_moves"): for move in optimizations["component_moves"]: success = client.move_footprint( move["reference"], move["new_position"] ) if success: applied_changes.append(move) if optimizations.get("component_rotations"): for rotation in optimizations["component_rotations"]: success = client.rotate_footprint( rotation["reference"], rotation["new_angle"] ) if success: applied_changes.append(rotation) # Save changes if any were made if applied_changes: client.save_board() return { "success": True, "project_path": project_path, "optimization_goals": optimization_goals, "placement_strategy": placement_strategy, "analysis": placement_analysis, "suggested_optimizations": optimizations, "applied_changes": applied_changes, "board_statistics": board_stats, "summary": f"Applied {len(applied_changes)} placement optimizations" } except Exception as e: logger.error(f"Error in placement optimization: {e}") return { "success": False, "error": str(e), "project_path": project_path } @mcp.tool() def analyze_routing_quality(project_path: str) -> dict[str, Any]: """ Analyze PCB routing quality and identify potential issues. Examines the current routing for signal integrity issues, thermal problems, manufacturability concerns, and optimization opportunities. Args: project_path: Path to KiCad project file (.kicad_pro) Returns: Dictionary with routing quality analysis and recommendations """ try: # Get project files files = get_project_files(project_path) if "pcb" not in files: return { "success": False, "error": "PCB file not found in project" } board_path = files["pcb"] with kicad_ipc_session(board_path=board_path) as client: # Get routing information tracks = client.get_tracks() nets = client.get_nets() footprints = client.get_footprints() connectivity = client.check_connectivity() # Analyze routing quality quality_analysis = { "connectivity_analysis": connectivity, "routing_density": _analyze_routing_density(tracks, footprints), "via_analysis": _analyze_via_usage(tracks), "trace_analysis": _analyze_trace_characteristics(tracks), "signal_integrity": _analyze_signal_integrity(tracks, nets), "thermal_analysis": _analyze_thermal_aspects(tracks, footprints), "manufacturability": _analyze_manufacturability(tracks) } # Generate overall quality score quality_score = _calculate_quality_score(quality_analysis) # Generate recommendations recommendations = _generate_routing_recommendations(quality_analysis) return { "success": True, "project_path": project_path, "quality_score": quality_score, "analysis": quality_analysis, "recommendations": recommendations, "summary": f"Routing quality score: {quality_score}/100" } except Exception as e: logger.error(f"Error in routing quality analysis: {e}") return { "success": False, "error": str(e), "project_path": project_path } @mcp.tool() def interactive_routing_session( project_path: str, net_name: str, routing_mode: str = "guided" ) -> dict[str, Any]: """ Start an interactive routing session for specific nets. Provides guided routing assistance using KiCad IPC API for real-time feedback and optimization suggestions during manual routing. Args: project_path: Path to KiCad project file (.kicad_pro) net_name: Name of the net to route (or "all" for all unrouted nets) routing_mode: Mode for routing assistance ("guided", "automatic", "manual") Returns: Dictionary with interactive routing session information """ try: # Get project files files = get_project_files(project_path) if "pcb" not in files: return { "success": False, "error": "PCB file not found in project" } board_path = files["pcb"] with kicad_ipc_session(board_path=board_path) as client: # Get net information if net_name == "all": connectivity = client.check_connectivity() target_nets = connectivity.get("routed_net_names", []) unrouted_nets = [] all_nets = client.get_nets() for net in all_nets: if net.name and net.name not in target_nets: unrouted_nets.append(net.name) session_info = { "session_type": "multi_net", "target_nets": unrouted_nets[:10], # Limit to first 10 "total_unrouted": len(unrouted_nets) } else: net = client.get_net_by_name(net_name) if not net: return { "success": False, "error": f"Net '{net_name}' not found in board" } session_info = { "session_type": "single_net", "target_net": net_name, "net_details": { "name": net.name, "code": getattr(net, 'code', 'unknown') } } # Analyze routing constraints and provide guidance routing_guidance = _generate_routing_guidance( client, session_info, routing_mode ) return { "success": True, "project_path": project_path, "session_info": session_info, "routing_mode": routing_mode, "guidance": routing_guidance, "instructions": [ "Use KiCad's interactive routing tools to route the specified nets", "Refer to the guidance for optimal routing strategies", "The board will be monitored for real-time feedback" ] } except Exception as e: logger.error(f"Error starting interactive routing session: {e}") return { "success": False, "error": str(e), "project_path": project_path } @mcp.tool() def route_specific_nets( project_path: str, net_names: list[str], routing_priority: str = "signal_integrity" ) -> dict[str, Any]: """ Route specific nets with targeted strategies. Routes only the specified nets using appropriate strategies based on signal type and routing requirements. Args: project_path: Path to KiCad project file (.kicad_pro) net_names: List of net names to route routing_priority: Priority for routing ("signal_integrity", "density", "thermal") Returns: Dictionary with specific net routing results """ try: # Get project files files = get_project_files(project_path) if "pcb" not in files: return { "success": False, "error": "PCB file not found in project" } board_path = files["pcb"] with kicad_ipc_session(board_path=board_path) as client: # Validate nets exist all_nets = {net.name: net for net in client.get_nets()} valid_nets = [] invalid_nets = [] for net_name in net_names: if net_name in all_nets: valid_nets.append(net_name) else: invalid_nets.append(net_name) if not valid_nets: return { "success": False, "error": f"None of the specified nets found: {net_names}", "invalid_nets": invalid_nets } # Clear existing routing for specified nets cleared_nets = [] for net_name in valid_nets: if client.delete_tracks_by_net(net_name): cleared_nets.append(net_name) # Configure routing for specific nets net_specific_config = _get_net_specific_routing_config( valid_nets, routing_priority ) # Use FreeRouting with net-specific configuration engine = FreeRoutingEngine() result = engine.route_board_complete( board_path, routing_config=net_specific_config ) # Analyze results for specified nets if result["success"]: net_results = _analyze_net_routing_results( client, valid_nets, result ) else: net_results = {"error": "Routing failed"} return { "success": result["success"], "project_path": project_path, "requested_nets": net_names, "valid_nets": valid_nets, "invalid_nets": invalid_nets, "cleared_nets": cleared_nets, "routing_priority": routing_priority, "routing_result": result, "net_specific_results": net_results } except Exception as e: logger.error(f"Error routing specific nets: {e}") return { "success": False, "error": str(e), "project_path": project_path, "net_names": net_names } # Helper functions for component placement optimization def _analyze_component_placement(footprints, goals, strategy): """Analyze current component placement for optimization opportunities.""" analysis = { "total_components": len(footprints), "placement_density": 0.0, "thermal_hotspots": [], "signal_groupings": {}, "optimization_opportunities": [] } # Simple placement density calculation if footprints: positions = [fp.position for fp in footprints] # Calculate bounding box and density analysis["placement_density"] = min(len(footprints) / 100.0, 1.0) # Simplified return analysis def _generate_placement_optimizations(footprints, analysis, goals): """Generate specific placement optimization suggestions.""" optimizations = { "component_moves": [], "component_rotations": [], "grouping_suggestions": [] } # Simple optimization logic (would be much more sophisticated in practice) for i, fp in enumerate(footprints[:3]): # Limit for demo if hasattr(fp, 'reference') and hasattr(fp, 'position'): optimizations["component_moves"].append({ "reference": fp.reference, "current_position": fp.position, "new_position": fp.position, # Would calculate optimal position "reason": "Thermal optimization" }) return optimizations # Helper functions for routing quality analysis def _analyze_routing_density(tracks, footprints): """Analyze routing density across the board.""" return { "total_tracks": len(tracks), "track_density": min(len(tracks) / max(len(footprints), 1), 5.0), "density_rating": "medium" } def _analyze_via_usage(tracks): """Analyze via usage patterns.""" via_count = len([t for t in tracks if hasattr(t, 'drill')]) # Simplified via detection return { "total_vias": via_count, "via_density": "normal", "via_types": {"standard": via_count} } def _analyze_trace_characteristics(tracks): """Analyze trace width and length characteristics.""" return { "total_traces": len(tracks), "width_distribution": {"standard": len(tracks)}, "length_statistics": {"average": 10.0, "max": 50.0} } def _analyze_signal_integrity(tracks, nets): """Analyze signal integrity aspects.""" return { "critical_nets": len([n for n in nets if "clk" in n.name.lower()]) if nets else 0, "high_speed_traces": 0, "impedance_controlled": False } def _analyze_thermal_aspects(tracks, footprints): """Analyze thermal management aspects.""" return { "thermal_vias": 0, "power_trace_width": "adequate", "heat_dissipation": "good" } def _analyze_manufacturability(tracks): """Analyze manufacturability constraints.""" return { "minimum_trace_width": 0.1, "minimum_spacing": 0.1, "drill_sizes": ["0.2", "0.3"], "manufacturability_rating": "good" } def _calculate_quality_score(analysis): """Calculate overall routing quality score.""" base_score = 75 connectivity = analysis.get("connectivity_analysis", {}) completion = connectivity.get("routing_completion", 0) # Simple scoring based on completion return min(int(base_score + completion * 0.25), 100) def _generate_routing_recommendations(analysis): """Generate routing improvement recommendations.""" recommendations = [] connectivity = analysis.get("connectivity_analysis", {}) unrouted = connectivity.get("unrouted_nets", 0) if unrouted > 0: recommendations.append(f"Complete routing for {unrouted} unrouted nets") recommendations.append("Consider adding test points for critical signals") recommendations.append("Verify impedance control for high-speed signals") return recommendations def _generate_routing_guidance(client, session_info, mode): """Generate routing guidance for interactive sessions.""" guidance = { "strategy": f"Optimized for {mode} routing", "constraints": [ "Maintain minimum trace width of 0.1mm", "Use 45-degree angles where possible", "Minimize via count on critical signals" ], "recommendations": [] } if session_info["session_type"] == "single_net": guidance["recommendations"].append( f"Route net '{session_info['target_net']}' with direct paths" ) else: guidance["recommendations"].append( f"Route {len(session_info['target_nets'])} nets in order of importance" ) return guidance def _get_net_specific_routing_config(net_names, priority): """Get routing configuration optimized for specific nets.""" base_config = { "via_costs": 50, "start_ripup_costs": 100, "max_iterations": 1000 } # Adjust based on priority if priority == "signal_integrity": base_config.update({ "via_costs": 80, # Minimize vias "automatic_neckdown": False }) elif priority == "density": base_config.update({ "via_costs": 30, # Allow more vias for density "automatic_neckdown": True }) return base_config def _analyze_net_routing_results(client, net_names, routing_result): """Analyze routing results for specific nets.""" try: connectivity = client.check_connectivity() routed_nets = set(connectivity.get("routed_net_names", [])) results = {} for net_name in net_names: results[net_name] = { "routed": net_name in routed_nets, "status": "routed" if net_name in routed_nets else "unrouted" } return results except Exception as e: return {"error": str(e)}