"""Screenshot mixin for Android ADB MCP Server. Provides tools for screen capture and display information. """ from datetime import datetime from pathlib import Path from typing import Any from fastmcp import Context from fastmcp.contrib.mcp_mixin import mcp_resource, mcp_tool from ..config import get_config, is_developer_mode from ..models import ( ActionResult, RecordingResult, ScreenDensityResult, ScreenSetResult, ScreenshotResult, ScreenSizeResult, ) from .base import ADBBaseMixin class ScreenshotMixin(ADBBaseMixin): """Mixin for Android screen capture. Provides tools for: - Taking screenshots - Getting screen dimensions - Screen recording (developer mode) """ @mcp_tool() async def screenshot( self, ctx: Context, filename: str | None = None, device_id: str | None = None, ) -> ScreenshotResult: """Take a screenshot of the device screen. Captures the current screen and saves it locally as a PNG file. Args: ctx: MCP context for logging filename: Output filename (default: screenshot_YYYYMMDD_HHMMSS.png) device_id: Target device Returns: ScreenshotResult with success status and file path """ await ctx.info("Capturing screenshot...") # Generate default filename with timestamp if not filename: timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") filename = f"screenshot_{timestamp}.png" # Use configured screenshot directory if set config = get_config() if config.default_screenshot_dir: output_path = Path(config.default_screenshot_dir) / filename else: output_path = Path(filename).absolute() # Ensure parent directory exists output_path.parent.mkdir(parents=True, exist_ok=True) # Take screenshot on device device_temp = "/sdcard/adb_mcp_screenshot.png" result = await self.run_shell_args(["screencap", "-p", device_temp], device_id) if not result.success: await ctx.error(f"Screenshot capture failed: {result.stderr}") return ScreenshotResult( success=False, error=f"Failed to capture screenshot: {result.stderr}", ) await ctx.info("Transferring screenshot to host...") # Pull to local machine pull_result = await self.run_adb( ["pull", device_temp, str(output_path)], device_id ) if not pull_result.success: await ctx.error(f"Screenshot transfer failed: {pull_result.stderr}") return ScreenshotResult( success=False, error=f"Failed to pull screenshot: {pull_result.stderr}", ) # Clean up device temp file await self.run_shell_args(["rm", device_temp], device_id) await ctx.info(f"Screenshot saved: {output_path}") return ScreenshotResult( success=True, local_path=str(output_path), ) @mcp_tool() async def screen_size( self, device_id: str | None = None, ) -> ScreenSizeResult: """Get the screen dimensions. Returns the physical screen resolution in pixels. Args: device_id: Target device Returns: Screen width and height """ result = await self.run_shell_args(["wm", "size"], device_id) if result.success: # Parse "Physical size: 1080x1920" for line in result.stdout.split("\n"): if "Physical size" in line or "Override size" in line: parts = line.split(":") if len(parts) == 2: size = parts[1].strip() if "x" in size: w, h = size.split("x") return ScreenSizeResult( success=True, width=int(w), height=int(h), raw=result.stdout, ) return ScreenSizeResult( success=False, error=result.stderr or "Could not parse screen size", raw=result.stdout, ) @mcp_tool() async def screen_density( self, device_id: str | None = None, ) -> ScreenDensityResult: """Get the screen density (DPI). Args: device_id: Target device Returns: Screen density in DPI """ result = await self.run_shell_args(["wm", "density"], device_id) if result.success: for line in result.stdout.split("\n"): if "Physical density" in line or "Override density" in line: parts = line.split(":") if len(parts) == 2: try: dpi = int(parts[1].strip()) return ScreenDensityResult( success=True, dpi=dpi, ) except ValueError: pass return ScreenDensityResult( success=False, error=result.stderr or "Could not parse density", raw=result.stdout, ) @mcp_tool() async def screen_on( self, device_id: str | None = None, ) -> ActionResult: """Turn the screen on. Wakes up the device display. Does not unlock. Args: device_id: Target device Returns: Success status """ result = await self.run_shell_args( ["input", "keyevent", "KEYCODE_WAKEUP"], device_id ) return ActionResult( success=result.success, action="screen_on", error=result.stderr if not result.success else None, ) @mcp_tool() async def screen_off( self, device_id: str | None = None, ) -> ActionResult: """Turn the screen off. Puts the device display to sleep. Args: device_id: Target device Returns: Success status """ result = await self.run_shell_args( ["input", "keyevent", "KEYCODE_SLEEP"], device_id ) return ActionResult( success=result.success, action="screen_off", error=result.stderr if not result.success else None, ) # === Developer Mode Tools === @mcp_tool( tags={"developer"}, annotations={"requires": "developer_mode"}, ) async def screen_record( self, ctx: Context, filename: str | None = None, duration_seconds: int = 10, device_id: str | None = None, ) -> RecordingResult: """Record the screen. [DEVELOPER MODE] Records the device screen to a video file. Recording runs for the specified duration. Args: ctx: MCP context for logging filename: Output filename (default: recording_YYYYMMDD_HHMMSS.mp4) duration_seconds: Recording duration (max 180 seconds) device_id: Target device Returns: Recording result with file path """ if not is_developer_mode(): return RecordingResult( success=False, error="Developer mode required", ) # Generate default filename if not filename: timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") filename = f"recording_{timestamp}.mp4" # Use configured directory config = get_config() if config.default_screenshot_dir: output_path = Path(config.default_screenshot_dir) / filename else: output_path = Path(filename).absolute() output_path.parent.mkdir(parents=True, exist_ok=True) # Limit duration duration = min(duration_seconds, 180) await ctx.info(f"Recording screen for {duration}s...") # Record on device — uses dedicated timeout for recording duration device_temp = "/sdcard/adb_mcp_recording.mp4" result = await self.run_shell_args( [ "screenrecord", "--time-limit", str(duration), device_temp, ], device_id, timeout=duration + 10, # Extra margin for command overhead ) if not result.success: return RecordingResult( success=False, error=f"Failed to record: {result.stderr}", ) await ctx.info("Transferring recording to host...") # Pull to local pull_result = await self.run_adb( ["pull", device_temp, str(output_path)], device_id ) # Clean up await self.run_shell_args(["rm", device_temp], device_id) if not pull_result.success: return RecordingResult( success=False, error=f"Failed to pull recording: {pull_result.stderr}", ) await ctx.info(f"Recording saved: {output_path}") return RecordingResult( success=True, local_path=str(output_path), duration_seconds=duration, ) @mcp_tool( tags={"developer"}, annotations={"requires": "developer_mode"}, ) async def screen_set_size( self, width: int, height: int, device_id: str | None = None, ) -> ScreenSetResult: """Override screen resolution. [DEVELOPER MODE] Changes the display resolution. Use screen_reset_size to restore original. Args: width: New width in pixels height: New height in pixels device_id: Target device Returns: Success status """ if not is_developer_mode(): return ScreenSetResult( success=False, action="set_size", error="Developer mode required", ) result = await self.run_shell_args( ["wm", "size", f"{width}x{height}"], device_id ) return ScreenSetResult( success=result.success, action="set_size", width=width, height=height, error=result.stderr if not result.success else None, ) @mcp_tool( tags={"developer"}, annotations={"requires": "developer_mode"}, ) async def screen_reset_size( self, device_id: str | None = None, ) -> ActionResult: """Reset screen to physical resolution. [DEVELOPER MODE] Restores the original display resolution. Args: device_id: Target device Returns: Success status """ if not is_developer_mode(): return ActionResult( success=False, action="reset_size", error="Developer mode required", ) result = await self.run_shell_args(["wm", "size", "reset"], device_id) return ActionResult( success=result.success, action="reset_size", error=result.stderr if not result.success else None, ) # === Resources === @mcp_resource(uri="adb://screen/info") async def resource_screen_info(self) -> dict[str, Any]: """Resource: Get screen information.""" size = await self.screen_size() density = await self.screen_density() return { "width": size.width, "height": size.height, "dpi": density.dpi, }