✨ Features: - 🧠 Shadow learner that builds intelligence from command patterns - 🛡️ Smart command validation with safety checks - 💾 Automatic context monitoring and backup system - 🔄 Session continuity across Claude restarts 📚 Documentation: - Complete Diátaxis-organized documentation - Learning-oriented tutorial for getting started - Task-oriented how-to guides for specific problems - Information-oriented reference for quick lookup - Understanding-oriented explanations of architecture 🚀 Installation: - One-command installation script - Bootstrap prompt for installation via Claude - Cross-platform compatibility - Comprehensive testing suite 🎯 Ready for real-world use and community feedback! 🤖 Generated with Claude Code Co-Authored-By: Claude <noreply@anthropic.com>
395 lines
15 KiB
Python
395 lines
15 KiB
Python
#!/usr/bin/env python3
|
|
"""Shadow Learner - Pattern learning and prediction system"""
|
|
|
|
import json
|
|
import math
|
|
import time
|
|
import difflib
|
|
from datetime import datetime, timedelta
|
|
from pathlib import Path
|
|
from typing import Dict, List, Optional, Any
|
|
from cachetools import TTLCache, LRUCache
|
|
|
|
try:
|
|
from .models import Pattern, ToolExecution, PatternDatabase, ValidationResult
|
|
except ImportError:
|
|
from models import Pattern, ToolExecution, PatternDatabase, ValidationResult
|
|
|
|
|
|
class ConfidenceCalculator:
|
|
"""Calculate confidence scores for learned patterns"""
|
|
|
|
@staticmethod
|
|
def calculate_command_confidence(success_count: int, failure_count: int,
|
|
recency_factor: float) -> float:
|
|
"""Calculate confidence for command failure patterns"""
|
|
total_attempts = success_count + failure_count
|
|
if total_attempts == 0:
|
|
return 0.0
|
|
|
|
# Base confidence from failure rate
|
|
failure_rate = failure_count / total_attempts
|
|
|
|
# Sample size adjustment (more data = more confidence)
|
|
sample_factor = min(1.0, total_attempts / 10.0) # Plateau at 10 samples
|
|
|
|
# Time decay (recent failures are more relevant)
|
|
confidence = (failure_rate * sample_factor * (0.5 + 0.5 * recency_factor))
|
|
|
|
return min(0.99, max(0.1, confidence)) # Clamp between 0.1 and 0.99
|
|
|
|
@staticmethod
|
|
def calculate_sequence_confidence(successful_sequences: int,
|
|
total_sequences: int) -> float:
|
|
"""Calculate confidence for tool sequence patterns"""
|
|
if total_sequences == 0:
|
|
return 0.0
|
|
|
|
success_rate = successful_sequences / total_sequences
|
|
sample_factor = min(1.0, total_sequences / 5.0)
|
|
|
|
return success_rate * sample_factor
|
|
|
|
|
|
class PatternMatcher:
|
|
"""Advanced pattern matching with fuzzy logic"""
|
|
|
|
def __init__(self, db: PatternDatabase):
|
|
self.db = db
|
|
|
|
def fuzzy_command_match(self, command: str, threshold: float = 0.8) -> List[Pattern]:
|
|
"""Find similar command patterns using fuzzy matching"""
|
|
cmd_tokens = command.lower().split()
|
|
if not cmd_tokens:
|
|
return []
|
|
|
|
base_cmd = cmd_tokens[0]
|
|
matches = []
|
|
|
|
for pattern in self.db.command_patterns.values():
|
|
pattern_cmd = pattern.trigger.get("command", "").lower()
|
|
|
|
# Exact match
|
|
if pattern_cmd == base_cmd:
|
|
matches.append(pattern)
|
|
# Fuzzy match on command name
|
|
elif difflib.SequenceMatcher(None, pattern_cmd, base_cmd).ratio() > threshold:
|
|
matches.append(pattern)
|
|
# Partial match (e.g., "pip3" matches "pip install")
|
|
elif any(pattern_cmd in token for token in cmd_tokens):
|
|
matches.append(pattern)
|
|
|
|
return sorted(matches, key=lambda p: p.confidence, reverse=True)
|
|
|
|
def context_pattern_match(self, current_context: Dict[str, Any]) -> List[Pattern]:
|
|
"""Match patterns based on current context"""
|
|
matches = []
|
|
|
|
for pattern in self.db.context_patterns.values():
|
|
trigger = pattern.trigger
|
|
|
|
# Check if all trigger conditions are met
|
|
if self._context_matches(current_context, trigger):
|
|
matches.append(pattern)
|
|
|
|
return sorted(matches, key=lambda p: p.confidence, reverse=True)
|
|
|
|
def _context_matches(self, current: Dict[str, Any], trigger: Dict[str, Any]) -> bool:
|
|
"""Check if current context matches trigger conditions"""
|
|
for key, expected_value in trigger.items():
|
|
if key not in current:
|
|
return False
|
|
|
|
current_value = current[key]
|
|
|
|
# Handle different value types
|
|
if isinstance(expected_value, str) and isinstance(current_value, str):
|
|
if expected_value.lower() not in current_value.lower():
|
|
return False
|
|
elif expected_value != current_value:
|
|
return False
|
|
|
|
return True
|
|
|
|
|
|
class LearningEngine:
|
|
"""Core learning algorithms"""
|
|
|
|
def __init__(self, db: PatternDatabase):
|
|
self.db = db
|
|
self.confidence_calc = ConfidenceCalculator()
|
|
|
|
def learn_from_execution(self, execution: ToolExecution):
|
|
"""Main learning entry point"""
|
|
|
|
# Learn command patterns
|
|
if execution.tool == "Bash":
|
|
self._learn_command_pattern(execution)
|
|
|
|
# Learn tool sequences
|
|
self._learn_sequence_pattern(execution)
|
|
|
|
# Learn context patterns
|
|
if not execution.success:
|
|
self._learn_failure_context(execution)
|
|
|
|
def _learn_command_pattern(self, execution: ToolExecution):
|
|
"""Learn from bash command executions"""
|
|
command = execution.parameters.get("command", "")
|
|
if not command:
|
|
return
|
|
|
|
base_cmd = command.split()[0]
|
|
pattern_id = f"cmd_{base_cmd}"
|
|
|
|
if pattern_id in self.db.command_patterns:
|
|
pattern = self.db.command_patterns[pattern_id]
|
|
|
|
# Update statistics
|
|
if execution.success:
|
|
pattern.prediction["success_count"] = pattern.prediction.get("success_count", 0) + 1
|
|
else:
|
|
pattern.prediction["failure_count"] = pattern.prediction.get("failure_count", 0) + 1
|
|
|
|
# Recalculate confidence
|
|
recency = self._calculate_recency(execution.timestamp)
|
|
pattern.confidence = self.confidence_calc.calculate_command_confidence(
|
|
pattern.prediction.get("success_count", 0),
|
|
pattern.prediction.get("failure_count", 0),
|
|
recency
|
|
)
|
|
pattern.last_seen = execution.timestamp
|
|
pattern.evidence_count += 1
|
|
|
|
else:
|
|
# Create new pattern
|
|
self.db.command_patterns[pattern_id] = Pattern(
|
|
pattern_id=pattern_id,
|
|
pattern_type="command_execution",
|
|
trigger={"command": base_cmd},
|
|
prediction={
|
|
"success_count": 1 if execution.success else 0,
|
|
"failure_count": 0 if execution.success else 1,
|
|
"common_errors": [execution.error_message] if execution.error_message else []
|
|
},
|
|
confidence=0.3, # Start with low confidence
|
|
evidence_count=1,
|
|
last_seen=execution.timestamp,
|
|
success_rate=1.0 if execution.success else 0.0
|
|
)
|
|
|
|
def _learn_sequence_pattern(self, execution: ToolExecution):
|
|
"""Learn from tool sequence patterns"""
|
|
# Get recent tool history (last 5 tools)
|
|
recent_tools = [e.tool for e in self.db.execution_history[-5:]]
|
|
recent_tools.append(execution.tool)
|
|
|
|
# Look for sequences of 2-3 tools
|
|
for seq_len in [2, 3]:
|
|
if len(recent_tools) >= seq_len:
|
|
sequence = tuple(recent_tools[-seq_len:])
|
|
pattern_id = f"seq_{'_'.join(sequence)}"
|
|
|
|
# Update or create sequence pattern
|
|
# (Simplified implementation - could be expanded)
|
|
pass
|
|
|
|
def _learn_failure_context(self, execution: ToolExecution):
|
|
"""Learn from failure contexts"""
|
|
if not execution.error_message:
|
|
return
|
|
|
|
# Extract key error indicators
|
|
error_key = self._extract_error_key(execution.error_message)
|
|
if not error_key:
|
|
return
|
|
|
|
pattern_id = f"ctx_error_{error_key}"
|
|
|
|
if pattern_id in self.db.context_patterns:
|
|
pattern = self.db.context_patterns[pattern_id]
|
|
pattern.evidence_count += 1
|
|
pattern.last_seen = execution.timestamp
|
|
# Update confidence based on repeated failures
|
|
pattern.confidence = min(0.95, pattern.confidence + 0.05)
|
|
else:
|
|
# Create new context pattern
|
|
self.db.context_patterns[pattern_id] = Pattern(
|
|
pattern_id=pattern_id,
|
|
pattern_type="context_error",
|
|
trigger={
|
|
"tool": execution.tool,
|
|
"error_type": error_key
|
|
},
|
|
prediction={
|
|
"likely_error": execution.error_message,
|
|
"suggestions": self._generate_suggestions(execution)
|
|
},
|
|
confidence=0.4,
|
|
evidence_count=1,
|
|
last_seen=execution.timestamp,
|
|
success_rate=0.0
|
|
)
|
|
|
|
def _calculate_recency(self, timestamp: datetime) -> float:
|
|
"""Calculate recency factor (1.0 = very recent, 0.0 = very old)"""
|
|
now = datetime.now()
|
|
age_hours = (now - timestamp).total_seconds() / 3600
|
|
|
|
# Exponential decay: recent events matter more
|
|
return max(0.0, math.exp(-age_hours / 24.0)) # 24 hour half-life
|
|
|
|
def _extract_error_key(self, error_message: str) -> Optional[str]:
|
|
"""Extract key error indicators from error messages"""
|
|
error_message = error_message.lower()
|
|
|
|
error_patterns = {
|
|
"command_not_found": ["command not found", "not found"],
|
|
"permission_denied": ["permission denied", "access denied"],
|
|
"file_not_found": ["no such file", "file not found"],
|
|
"connection_error": ["connection refused", "network unreachable"],
|
|
"syntax_error": ["syntax error", "invalid syntax"]
|
|
}
|
|
|
|
for error_type, patterns in error_patterns.items():
|
|
if any(pattern in error_message for pattern in patterns):
|
|
return error_type
|
|
|
|
return None
|
|
|
|
def _generate_suggestions(self, execution: ToolExecution) -> List[str]:
|
|
"""Generate suggestions based on failed execution"""
|
|
suggestions = []
|
|
|
|
if execution.tool == "Bash":
|
|
command = execution.parameters.get("command", "")
|
|
if command:
|
|
base_cmd = command.split()[0]
|
|
|
|
# Common command alternatives
|
|
alternatives = {
|
|
"pip": ["pip3", "python -m pip", "python3 -m pip"],
|
|
"python": ["python3"],
|
|
"node": ["nodejs"],
|
|
"vim": ["nvim", "nano"],
|
|
}
|
|
|
|
if base_cmd in alternatives:
|
|
suggestions.extend([f"Try '{alt} {' '.join(command.split()[1:])}'"
|
|
for alt in alternatives[base_cmd]])
|
|
|
|
return suggestions
|
|
|
|
|
|
class PredictionEngine:
|
|
"""Generate predictions and suggestions"""
|
|
|
|
def __init__(self, matcher: PatternMatcher):
|
|
self.matcher = matcher
|
|
|
|
def predict_command_outcome(self, command: str, context: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""Predict if a command will succeed and suggest alternatives"""
|
|
|
|
# Find matching patterns
|
|
command_patterns = self.matcher.fuzzy_command_match(command)
|
|
context_patterns = self.matcher.context_pattern_match(context)
|
|
|
|
prediction = {
|
|
"likely_success": True,
|
|
"confidence": 0.5,
|
|
"warnings": [],
|
|
"suggestions": []
|
|
}
|
|
|
|
# Analyze command patterns
|
|
for pattern in command_patterns[:3]: # Top 3 matches
|
|
if pattern.confidence > 0.7:
|
|
failure_rate = pattern.prediction.get("failure_count", 0) / max(1, pattern.evidence_count)
|
|
|
|
if failure_rate > 0.6: # High failure rate
|
|
prediction["likely_success"] = False
|
|
prediction["confidence"] = pattern.confidence
|
|
prediction["warnings"].append(f"Command '{command.split()[0]}' often fails")
|
|
|
|
# Add suggestions from pattern
|
|
suggestions = pattern.prediction.get("suggestions", [])
|
|
prediction["suggestions"].extend(suggestions)
|
|
|
|
return prediction
|
|
|
|
|
|
class ShadowLearner:
|
|
"""Main shadow learner interface"""
|
|
|
|
def __init__(self, storage_path: str = ".claude_hooks/patterns"):
|
|
self.storage_path = Path(storage_path)
|
|
self.storage_path.mkdir(parents=True, exist_ok=True)
|
|
|
|
self.db = self._load_database()
|
|
self.matcher = PatternMatcher(self.db)
|
|
self.learning_engine = LearningEngine(self.db)
|
|
self.prediction_engine = PredictionEngine(self.matcher)
|
|
|
|
# Performance caches
|
|
self.prediction_cache = TTLCache(maxsize=1000, ttl=300) # 5-minute cache
|
|
|
|
def learn_from_execution(self, execution: ToolExecution):
|
|
"""Learn from tool execution"""
|
|
try:
|
|
self.learning_engine.learn_from_execution(execution)
|
|
self.db.execution_history.append(execution)
|
|
|
|
# Trim history to keep memory usage reasonable
|
|
if len(self.db.execution_history) > 1000:
|
|
self.db.execution_history = self.db.execution_history[-500:]
|
|
|
|
except Exception as e:
|
|
# Learning failures shouldn't break the system
|
|
pass
|
|
|
|
def predict_command_outcome(self, command: str, context: Dict[str, Any] = None) -> Dict[str, Any]:
|
|
"""Predict command outcome with caching"""
|
|
cache_key = f"cmd_pred:{hash(command)}"
|
|
|
|
if cache_key in self.prediction_cache:
|
|
return self.prediction_cache[cache_key]
|
|
|
|
prediction = self.prediction_engine.predict_command_outcome(
|
|
command, context or {}
|
|
)
|
|
|
|
self.prediction_cache[cache_key] = prediction
|
|
return prediction
|
|
|
|
def save_database(self):
|
|
"""Save learned patterns to disk"""
|
|
try:
|
|
patterns_file = self.storage_path / "patterns.json"
|
|
backup_file = self.storage_path / "patterns.backup.json"
|
|
|
|
# Create backup of existing data
|
|
if patterns_file.exists():
|
|
patterns_file.rename(backup_file)
|
|
|
|
# Save new data
|
|
with open(patterns_file, 'w') as f:
|
|
json.dump(self.db.to_dict(), f, indent=2)
|
|
|
|
except Exception as e:
|
|
# Save failures shouldn't break the system
|
|
pass
|
|
|
|
def _load_database(self) -> PatternDatabase:
|
|
"""Load patterns database from disk"""
|
|
patterns_file = self.storage_path / "patterns.json"
|
|
|
|
try:
|
|
if patterns_file.exists():
|
|
with open(patterns_file, 'r') as f:
|
|
data = json.load(f)
|
|
return PatternDatabase.from_dict(data)
|
|
except Exception:
|
|
# If loading fails, start with empty database
|
|
pass
|
|
|
|
return PatternDatabase() |