#!/usr/bin/env python3 """Context Monitor - Token estimation and backup trigger system""" import json import time from datetime import datetime, timedelta from pathlib import Path from typing import Dict, Any, Optional try: from .models import BackupDecision except ImportError: from models import BackupDecision class ContextMonitor: """Monitors conversation context and predicts token usage""" def __init__(self, storage_path: str = ".claude_hooks"): self.storage_path = Path(storage_path) self.storage_path.mkdir(parents=True, exist_ok=True) self.session_start = datetime.now() self.prompt_count = 0 self.estimated_tokens = 0 self.tool_executions = 0 self.file_operations = 0 # Token estimation constants (conservative estimates) self.TOKENS_PER_CHAR = 0.25 # Average for English text self.TOOL_OVERHEAD = 200 # Tokens per tool call self.SYSTEM_OVERHEAD = 500 # Base conversation overhead self.MAX_CONTEXT = 200000 # Claude's context limit # Backup thresholds self.backup_threshold = 0.85 self.emergency_threshold = 0.95 # Error tracking self.estimation_errors = 0 self.max_errors = 5 self._last_good_estimate = 0.5 # Load previous session state if available self._load_session_state() def estimate_prompt_tokens(self, prompt_data: Dict[str, Any]) -> int: """Estimate tokens in user prompt""" try: prompt_text = prompt_data.get("prompt", "") # Basic character count estimation base_tokens = len(prompt_text) * self.TOKENS_PER_CHAR # Add overhead for system prompts, context, etc. overhead_tokens = self.SYSTEM_OVERHEAD return int(base_tokens + overhead_tokens) except Exception: # Fallback estimation return 1000 def estimate_conversation_tokens(self) -> int: """Estimate total conversation tokens""" try: # Base conversation context base_tokens = self.estimated_tokens # Add tool execution overhead tool_tokens = self.tool_executions * self.TOOL_OVERHEAD # Add file operation overhead (file contents in context) file_tokens = self.file_operations * 1000 # Average file size # Conversation history grows over time history_tokens = self.prompt_count * 300 # Average response size total = base_tokens + tool_tokens + file_tokens + history_tokens return min(total, self.MAX_CONTEXT) except Exception: return self._handle_estimation_failure() def get_context_usage_ratio(self) -> float: """Get estimated context usage as ratio (0.0 to 1.0)""" try: estimated = self.estimate_conversation_tokens() ratio = min(1.0, estimated / self.MAX_CONTEXT) # Reset error counter on success self.estimation_errors = 0 self._last_good_estimate = ratio return ratio except Exception: self.estimation_errors += 1 # Too many errors - use conservative fallback if self.estimation_errors >= self.max_errors: return 0.7 # Conservative threshold # Single error - use last known good value return self._last_good_estimate def should_trigger_backup(self, threshold: Optional[float] = None) -> bool: """Check if backup should be triggered""" try: if threshold is None: threshold = self.backup_threshold usage = self.get_context_usage_ratio() # Edge case: Very early in session if self.prompt_count < 2: return False # Edge case: Already near context limit if usage > self.emergency_threshold: # Emergency backup - don't wait for other conditions return True # Session duration factor session_hours = (datetime.now() - self.session_start).total_seconds() / 3600 complexity_factor = (self.tool_executions + self.file_operations) / 20 # Trigger earlier for complex sessions adjusted_threshold = threshold - (complexity_factor * 0.1) # Multiple trigger conditions return ( usage > adjusted_threshold or session_hours > 2.0 or (usage > 0.7 and session_hours > 1.0) ) except Exception: # When in doubt, backup (better safe than sorry) return True def update_from_prompt(self, prompt_data: Dict[str, Any]): """Update estimates when user submits prompt""" try: self.prompt_count += 1 prompt_tokens = self.estimate_prompt_tokens(prompt_data) self.estimated_tokens += prompt_tokens # Save state periodically if self.prompt_count % 5 == 0: self._save_session_state() except Exception: pass # Don't let tracking errors break the system def update_from_tool_use(self, tool_data: Dict[str, Any]): """Update estimates when tools are used""" try: self.tool_executions += 1 tool_name = tool_data.get("tool", "") # File operations add content to context if tool_name in ["Read", "Edit", "Write", "Glob", "MultiEdit"]: self.file_operations += 1 # Large outputs add to context parameters = tool_data.get("parameters", {}) if "file_path" in parameters: self.estimated_tokens += 500 # Estimated file content # Save state periodically if self.tool_executions % 10 == 0: self._save_session_state() except Exception: pass # Don't let tracking errors break the system def check_backup_triggers(self, hook_event: str, data: Dict[str, Any]) -> BackupDecision: """Check all backup trigger conditions""" try: # Context-based triggers if self.should_trigger_backup(): usage = self.get_context_usage_ratio() urgency = "high" if usage > self.emergency_threshold else "medium" return BackupDecision( should_backup=True, reason="context_threshold", urgency=urgency, metadata={"usage_ratio": usage} ) # Activity-based triggers if self._should_backup_by_activity(): return BackupDecision( should_backup=True, reason="activity_threshold", urgency="medium" ) # Critical operation triggers if self._is_critical_operation(data): return BackupDecision( should_backup=True, reason="critical_operation", urgency="high" ) return BackupDecision(should_backup=False, reason="no_trigger") except Exception: # If trigger checking fails, err on side of safety return BackupDecision( should_backup=True, reason="trigger_check_failed", urgency="medium" ) def _should_backup_by_activity(self) -> bool: """Activity-based backup triggers""" # Backup after significant file modifications if (self.file_operations % 10 == 0 and self.file_operations > 0): return True # Backup after many tool executions if (self.tool_executions % 25 == 0 and self.tool_executions > 0): return True return False def _is_critical_operation(self, data: Dict[str, Any]) -> bool: """Detect operations that should trigger immediate backup""" tool = data.get("tool", "") params = data.get("parameters", {}) # Git operations if tool == "Bash": command = params.get("command", "").lower() if any(git_cmd in command for git_cmd in ["git commit", "git push", "git merge"]): return True # Package installations if any(pkg_cmd in command for pkg_cmd in ["npm install", "pip install", "cargo install"]): return True # Major file operations if tool in ["Write", "MultiEdit"]: content = params.get("content", "") if len(content) > 5000: # Large file changes return True return False def _handle_estimation_failure(self) -> int: """Fallback estimation when primary method fails""" # Method 1: Time-based estimation session_duration = (datetime.now() - self.session_start).total_seconds() / 3600 if session_duration > 1.0: # 1 hour = likely high usage return int(self.MAX_CONTEXT * 0.8) # Method 2: Activity-based estimation total_activity = self.tool_executions + self.file_operations if total_activity > 50: # High activity = likely high context return int(self.MAX_CONTEXT * 0.75) # Method 3: Conservative default return int(self.MAX_CONTEXT * 0.5) def _save_session_state(self): """Save current session state to disk""" try: state_file = self.storage_path / "session_state.json" state = { "session_start": self.session_start.isoformat(), "prompt_count": self.prompt_count, "estimated_tokens": self.estimated_tokens, "tool_executions": self.tool_executions, "file_operations": self.file_operations, "last_updated": datetime.now().isoformat() } with open(state_file, 'w') as f: json.dump(state, f, indent=2) except Exception: pass # Don't let state saving errors break the system def _load_session_state(self): """Load previous session state if available""" try: state_file = self.storage_path / "session_state.json" if state_file.exists(): with open(state_file, 'r') as f: state = json.load(f) # Only load if session is recent (within last hour) last_updated = datetime.fromisoformat(state["last_updated"]) if datetime.now() - last_updated < timedelta(hours=1): self.prompt_count = state.get("prompt_count", 0) self.estimated_tokens = state.get("estimated_tokens", 0) self.tool_executions = state.get("tool_executions", 0) self.file_operations = state.get("file_operations", 0) except Exception: pass # If loading fails, start fresh def get_session_summary(self) -> Dict[str, Any]: """Get current session summary""" return { "session_duration": str(datetime.now() - self.session_start), "prompt_count": self.prompt_count, "tool_executions": self.tool_executions, "file_operations": self.file_operations, "estimated_tokens": self.estimate_conversation_tokens(), "context_usage_ratio": self.get_context_usage_ratio(), "should_backup": self.should_trigger_backup() }