#!/usr/bin/env python3 """Shadow Learner - Pattern learning and prediction system""" import json import math import time import difflib from datetime import datetime, timedelta from pathlib import Path from typing import Dict, List, Optional, Any from cachetools import TTLCache, LRUCache try: from .models import Pattern, ToolExecution, PatternDatabase, ValidationResult except ImportError: from models import Pattern, ToolExecution, PatternDatabase, ValidationResult class ConfidenceCalculator: """Calculate confidence scores for learned patterns""" @staticmethod def calculate_command_confidence(success_count: int, failure_count: int, recency_factor: float) -> float: """Calculate confidence for command failure patterns""" total_attempts = success_count + failure_count if total_attempts == 0: return 0.0 # Base confidence from failure rate failure_rate = failure_count / total_attempts # Sample size adjustment (more data = more confidence) sample_factor = min(1.0, total_attempts / 10.0) # Plateau at 10 samples # Time decay (recent failures are more relevant) confidence = (failure_rate * sample_factor * (0.5 + 0.5 * recency_factor)) return min(0.99, max(0.1, confidence)) # Clamp between 0.1 and 0.99 @staticmethod def calculate_sequence_confidence(successful_sequences: int, total_sequences: int) -> float: """Calculate confidence for tool sequence patterns""" if total_sequences == 0: return 0.0 success_rate = successful_sequences / total_sequences sample_factor = min(1.0, total_sequences / 5.0) return success_rate * sample_factor class PatternMatcher: """Advanced pattern matching with fuzzy logic""" def __init__(self, db: PatternDatabase): self.db = db def fuzzy_command_match(self, command: str, threshold: float = 0.8) -> List[Pattern]: """Find similar command patterns using fuzzy matching""" cmd_tokens = command.lower().split() if not cmd_tokens: return [] base_cmd = cmd_tokens[0] matches = [] for pattern in self.db.command_patterns.values(): pattern_cmd = pattern.trigger.get("command", "").lower() # Exact match if pattern_cmd == base_cmd: matches.append(pattern) # Fuzzy match on command name elif difflib.SequenceMatcher(None, pattern_cmd, base_cmd).ratio() > threshold: matches.append(pattern) # Partial match (e.g., "pip3" matches "pip install") elif any(pattern_cmd in token for token in cmd_tokens): matches.append(pattern) return sorted(matches, key=lambda p: p.confidence, reverse=True) def context_pattern_match(self, current_context: Dict[str, Any]) -> List[Pattern]: """Match patterns based on current context""" matches = [] for pattern in self.db.context_patterns.values(): trigger = pattern.trigger # Check if all trigger conditions are met if self._context_matches(current_context, trigger): matches.append(pattern) return sorted(matches, key=lambda p: p.confidence, reverse=True) def _context_matches(self, current: Dict[str, Any], trigger: Dict[str, Any]) -> bool: """Check if current context matches trigger conditions""" for key, expected_value in trigger.items(): if key not in current: return False current_value = current[key] # Handle different value types if isinstance(expected_value, str) and isinstance(current_value, str): if expected_value.lower() not in current_value.lower(): return False elif expected_value != current_value: return False return True class LearningEngine: """Core learning algorithms""" def __init__(self, db: PatternDatabase): self.db = db self.confidence_calc = ConfidenceCalculator() def learn_from_execution(self, execution: ToolExecution): """Main learning entry point""" # Learn command patterns if execution.tool == "Bash": self._learn_command_pattern(execution) # Learn tool sequences self._learn_sequence_pattern(execution) # Learn context patterns if not execution.success: self._learn_failure_context(execution) def _learn_command_pattern(self, execution: ToolExecution): """Learn from bash command executions""" command = execution.parameters.get("command", "") if not command: return base_cmd = command.split()[0] pattern_id = f"cmd_{base_cmd}" if pattern_id in self.db.command_patterns: pattern = self.db.command_patterns[pattern_id] # Update statistics if execution.success: pattern.prediction["success_count"] = pattern.prediction.get("success_count", 0) + 1 else: pattern.prediction["failure_count"] = pattern.prediction.get("failure_count", 0) + 1 # Recalculate confidence recency = self._calculate_recency(execution.timestamp) pattern.confidence = self.confidence_calc.calculate_command_confidence( pattern.prediction.get("success_count", 0), pattern.prediction.get("failure_count", 0), recency ) pattern.last_seen = execution.timestamp pattern.evidence_count += 1 else: # Create new pattern self.db.command_patterns[pattern_id] = Pattern( pattern_id=pattern_id, pattern_type="command_execution", trigger={"command": base_cmd}, prediction={ "success_count": 1 if execution.success else 0, "failure_count": 0 if execution.success else 1, "common_errors": [execution.error_message] if execution.error_message else [] }, confidence=0.3, # Start with low confidence evidence_count=1, last_seen=execution.timestamp, success_rate=1.0 if execution.success else 0.0 ) def _learn_sequence_pattern(self, execution: ToolExecution): """Learn from tool sequence patterns""" # Get recent tool history (last 5 tools) recent_tools = [e.tool for e in self.db.execution_history[-5:]] recent_tools.append(execution.tool) # Look for sequences of 2-3 tools for seq_len in [2, 3]: if len(recent_tools) >= seq_len: sequence = tuple(recent_tools[-seq_len:]) pattern_id = f"seq_{'_'.join(sequence)}" # Update or create sequence pattern # (Simplified implementation - could be expanded) pass def _learn_failure_context(self, execution: ToolExecution): """Learn from failure contexts""" if not execution.error_message: return # Extract key error indicators error_key = self._extract_error_key(execution.error_message) if not error_key: return pattern_id = f"ctx_error_{error_key}" if pattern_id in self.db.context_patterns: pattern = self.db.context_patterns[pattern_id] pattern.evidence_count += 1 pattern.last_seen = execution.timestamp # Update confidence based on repeated failures pattern.confidence = min(0.95, pattern.confidence + 0.05) else: # Create new context pattern self.db.context_patterns[pattern_id] = Pattern( pattern_id=pattern_id, pattern_type="context_error", trigger={ "tool": execution.tool, "error_type": error_key }, prediction={ "likely_error": execution.error_message, "suggestions": self._generate_suggestions(execution) }, confidence=0.4, evidence_count=1, last_seen=execution.timestamp, success_rate=0.0 ) def _calculate_recency(self, timestamp: datetime) -> float: """Calculate recency factor (1.0 = very recent, 0.0 = very old)""" now = datetime.now() age_hours = (now - timestamp).total_seconds() / 3600 # Exponential decay: recent events matter more return max(0.0, math.exp(-age_hours / 24.0)) # 24 hour half-life def _extract_error_key(self, error_message: str) -> Optional[str]: """Extract key error indicators from error messages""" error_message = error_message.lower() error_patterns = { "command_not_found": ["command not found", "not found"], "permission_denied": ["permission denied", "access denied"], "file_not_found": ["no such file", "file not found"], "connection_error": ["connection refused", "network unreachable"], "syntax_error": ["syntax error", "invalid syntax"] } for error_type, patterns in error_patterns.items(): if any(pattern in error_message for pattern in patterns): return error_type return None def _generate_suggestions(self, execution: ToolExecution) -> List[str]: """Generate suggestions based on failed execution""" suggestions = [] if execution.tool == "Bash": command = execution.parameters.get("command", "") if command: base_cmd = command.split()[0] # Common command alternatives alternatives = { "pip": ["pip3", "python -m pip", "python3 -m pip"], "python": ["python3"], "node": ["nodejs"], "vim": ["nvim", "nano"], } if base_cmd in alternatives: suggestions.extend([f"Try '{alt} {' '.join(command.split()[1:])}'" for alt in alternatives[base_cmd]]) return suggestions class PredictionEngine: """Generate predictions and suggestions""" def __init__(self, matcher: PatternMatcher): self.matcher = matcher def predict_command_outcome(self, command: str, context: Dict[str, Any]) -> Dict[str, Any]: """Predict if a command will succeed and suggest alternatives""" # Find matching patterns command_patterns = self.matcher.fuzzy_command_match(command) context_patterns = self.matcher.context_pattern_match(context) prediction = { "likely_success": True, "confidence": 0.5, "warnings": [], "suggestions": [] } # Analyze command patterns for pattern in command_patterns[:3]: # Top 3 matches if pattern.confidence > 0.7: failure_rate = pattern.prediction.get("failure_count", 0) / max(1, pattern.evidence_count) if failure_rate > 0.6: # High failure rate prediction["likely_success"] = False prediction["confidence"] = pattern.confidence prediction["warnings"].append(f"Command '{command.split()[0]}' often fails") # Add suggestions from pattern suggestions = pattern.prediction.get("suggestions", []) prediction["suggestions"].extend(suggestions) return prediction class ShadowLearner: """Main shadow learner interface""" def __init__(self, storage_path: str = ".claude_hooks/patterns"): self.storage_path = Path(storage_path) self.storage_path.mkdir(parents=True, exist_ok=True) self.db = self._load_database() self.matcher = PatternMatcher(self.db) self.learning_engine = LearningEngine(self.db) self.prediction_engine = PredictionEngine(self.matcher) # Performance caches self.prediction_cache = TTLCache(maxsize=1000, ttl=300) # 5-minute cache def learn_from_execution(self, execution: ToolExecution): """Learn from tool execution""" try: self.learning_engine.learn_from_execution(execution) self.db.execution_history.append(execution) # Trim history to keep memory usage reasonable if len(self.db.execution_history) > 1000: self.db.execution_history = self.db.execution_history[-500:] except Exception as e: # Learning failures shouldn't break the system pass def predict_command_outcome(self, command: str, context: Dict[str, Any] = None) -> Dict[str, Any]: """Predict command outcome with caching""" cache_key = f"cmd_pred:{hash(command)}" if cache_key in self.prediction_cache: return self.prediction_cache[cache_key] prediction = self.prediction_engine.predict_command_outcome( command, context or {} ) self.prediction_cache[cache_key] = prediction return prediction def save_database(self): """Save learned patterns to disk""" try: patterns_file = self.storage_path / "patterns.json" backup_file = self.storage_path / "patterns.backup.json" # Create backup of existing data if patterns_file.exists(): patterns_file.rename(backup_file) # Save new data with open(patterns_file, 'w') as f: json.dump(self.db.to_dict(), f, indent=2) except Exception as e: # Save failures shouldn't break the system pass def _load_database(self) -> PatternDatabase: """Load patterns database from disk""" patterns_file = self.storage_path / "patterns.json" try: if patterns_file.exists(): with open(patterns_file, 'r') as f: data = json.load(f) return PatternDatabase.from_dict(data) except Exception: # If loading fails, start with empty database pass return PatternDatabase()