""" Logger Interceptor Base Class Abstract base class for intercepting and redirecting CLI tool logging to MCP context. Provides the foundation for bidirectional communication with any CLI tool. """ import logging import time from abc import ABC, abstractmethod from contextlib import asynccontextmanager from typing import Any from fastmcp import Context logger = logging.getLogger(__name__) class LoggerInterceptor(ABC): """Abstract base class for CLI tool logger interception""" def __init__(self, context: Context, operation_id: str): """ Initialize logger interceptor Args: context: FastMCP context for logging and user interaction operation_id: Unique identifier for this operation """ self.context = context self.operation_id = operation_id self.operation_start_time = time.time() # Detect MCP client capabilities self.capabilities = self._detect_mcp_capabilities() # Operation state self.progress_history: list[dict[str, Any]] = [] self.user_confirmations: dict[str, bool] = {} self.active_stages: list[str] = [] logger.debug(f"Logger interceptor initialized for operation: {operation_id}") def _detect_mcp_capabilities(self) -> dict[str, bool]: """Detect available MCP client capabilities""" capabilities = { "logging": hasattr(self.context, "log") and callable(self.context.log), "progress": hasattr(self.context, "progress") and callable(self.context.progress), "elicitation": hasattr(self.context, "request_user_input") and callable(self.context.request_user_input), "sampling": hasattr(self.context, "sample") and callable(self.context.sample), } logger.debug(f"Detected MCP capabilities: {capabilities}") return capabilities @abstractmethod async def install_hooks(self) -> None: """Install middleware hooks into the target tool""" pass @abstractmethod async def remove_hooks(self) -> None: """Remove middleware hooks from the target tool""" pass @abstractmethod def get_interaction_points(self) -> list[str]: """Return list of operations that require user interaction""" pass @asynccontextmanager async def activate(self): """Context manager for middleware lifecycle""" try: await self.install_hooks() await self._log_operation_start() yield self except Exception as e: await self._log_error(f"Middleware activation failed: {e}") raise finally: await self._log_operation_end() await self.remove_hooks() # Enhanced logging methods async def _log_info(self, message: str, **kwargs) -> None: """Log informational message to MCP context""" if self.capabilities["logging"]: try: await self.context.log(level="info", message=message, **kwargs) except Exception as e: logger.warning(f"Failed to log info message: {e}") async def _log_warning(self, message: str, **kwargs) -> None: """Log warning message to MCP context""" if self.capabilities["logging"]: try: await self.context.log(level="warning", message=f"⚠️ {message}", **kwargs) except Exception as e: logger.warning(f"Failed to log warning message: {e}") async def _log_error(self, message: str, **kwargs) -> None: """Log error message to MCP context""" if self.capabilities["logging"]: try: await self.context.log(level="error", message=f"❌ {message}", **kwargs) except Exception as e: logger.error(f"Failed to log error message: {e}") async def _log_success(self, message: str, **kwargs) -> None: """Log success message to MCP context""" if self.capabilities["logging"]: try: await self.context.log(level="info", message=f"✅ {message}", **kwargs) except Exception as e: logger.warning(f"Failed to log success message: {e}") async def _update_progress( self, percentage: float, message: str = "", current: int | None = None, total: int | None = None, ) -> None: """Update operation progress""" if self.capabilities["progress"]: try: await self.context.progress( operation_id=self.operation_id, progress=percentage, total=total or 100, current=current or int(percentage), message=message, ) # Store progress history self.progress_history.append( { "timestamp": time.time(), "percentage": percentage, "message": message, "current": current, "total": total, } ) except Exception as e: logger.warning(f"Failed to update progress: {e}") async def _request_user_confirmation( self, prompt: str, default: bool = True, cache_key: str | None = None ) -> bool: """Request user confirmation with optional caching""" # Use cache key or prompt as key confirmation_key = cache_key or prompt # Check cache first if confirmation_key in self.user_confirmations: logger.debug(f"Using cached confirmation for: {confirmation_key}") return self.user_confirmations[confirmation_key] if self.capabilities["elicitation"]: try: response = await self.context.request_user_input( prompt=prompt, input_type="confirmation", additional_data={"default": default} ) confirmed = response.get("confirmed", default) self.user_confirmations[confirmation_key] = confirmed await self._log_info( f"User confirmation: {prompt} -> {'Yes' if confirmed else 'No'}" ) return confirmed except Exception as e: await self._log_warning(f"User confirmation failed: {e}") return default else: # No elicitation support, use default await self._log_info( f"Auto-confirming (no elicitation): {prompt} -> {'Yes' if default else 'No'}" ) return default async def _handle_stage_start(self, stage_message: str) -> None: """Handle stage start with potential user interaction""" self.active_stages.append(stage_message) await self._log_info(f"🔄 Starting: {stage_message}") # Check if this stage requires user confirmation if self._requires_user_interaction(stage_message): confirmed = await self._request_user_confirmation( f"🤔 About to: {stage_message}. Continue?", default=True, cache_key=f"stage_{stage_message}", ) if not confirmed: await self._log_error(f"Operation cancelled by user: {stage_message}") raise RuntimeError(f"User cancelled operation: {stage_message}") async def _handle_stage_end(self, stage_message: str | None = None) -> None: """Handle stage completion""" if self.active_stages: completed_stage = stage_message or self.active_stages.pop() await self._log_success(f"Completed: {completed_stage}") elif stage_message: await self._log_success(f"Completed: {stage_message}") def _requires_user_interaction(self, operation: str) -> bool: """Determine if operation requires user confirmation""" critical_keywords = [ "erase", "burn", "encrypt", "secure", "factory", "reset", "delete", "remove", "clear", "format", "destroy", ] operation_lower = operation.lower() return any(keyword in operation_lower for keyword in critical_keywords) def _format_message(self, message: str, *args) -> str: """Format message with optional arguments""" try: return message % args if args else message except (TypeError, ValueError): return f"{message} {' '.join(map(str, args))}" if args else message async def _log_operation_start(self) -> None: """Log operation start""" await self._log_info(f"🔧 Operation started: {self.operation_id}") async def _log_operation_end(self) -> None: """Log operation completion with statistics""" duration = time.time() - self.operation_start_time await self._log_info( f"⏱️ Operation completed: {self.operation_id} " f"(duration: {duration:.2f}s, " f"progress_updates: {len(self.progress_history)}, " f"confirmations: {len(self.user_confirmations)})" ) def get_operation_statistics(self) -> dict[str, Any]: """Get operation statistics for analysis""" duration = time.time() - self.operation_start_time return { "operation_id": self.operation_id, "duration_seconds": round(duration, 2), "progress_updates": len(self.progress_history), "user_confirmations": len(self.user_confirmations), "stages_completed": len(self.active_stages), "capabilities_used": [cap for cap, available in self.capabilities.items() if available], "start_time": self.operation_start_time, "end_time": time.time(), } class MiddlewareError(Exception): """Base exception for middleware-related errors""" pass class ToolNotFoundError(MiddlewareError): """Raised when target CLI tool is not found or available""" pass class HookInstallationError(MiddlewareError): """Raised when middleware hooks cannot be installed""" pass class UserCancellationError(MiddlewareError): """Raised when user cancels an operation""" pass