"""MCP Video Editor Server - Professional video production tools.""" import os from pathlib import Path from fastmcp import FastMCP # Optional imports for video processing try: from moviepy import * # Import all MoviePy functionality MOVIEPY_AVAILABLE = True except ImportError as e: print(f"MoviePy import error: {e}") MOVIEPY_AVAILABLE = False try: import cv2 import numpy as np CV2_AVAILABLE = True except ImportError: CV2_AVAILABLE = False try: import ffmpeg FFMPEG_AVAILABLE = True except ImportError: FFMPEG_AVAILABLE = False # Initialize FastMCP server mcp = FastMCP("MCP Video Editor") def _check_moviepy_dependency(): """Check if MoviePy is available for video processing.""" if not MOVIEPY_AVAILABLE: return {"error": "MoviePy is not installed. Install with: pip install moviepy"} return None class VideoRecordingSession: """Manages video recording sessions.""" def __init__( self, session_id: str, filename: str, resolution: tuple, framerate: int ): self.session_id = session_id self.filename = filename self.resolution = resolution self.framerate = framerate self.is_recording = False self.output_path = None def start(self, output_dir: str = "./temp_videos"): """Start recording session.""" os.makedirs(output_dir, exist_ok=True) self.output_path = os.path.join(output_dir, self.filename) self.is_recording = True return self.output_path # Global recording sessions store recording_sessions: dict[str, VideoRecordingSession] = {} @mcp.tool() def mcp_video_editor_status() -> dict[str, str | bool | int]: """ Get the status of the MCP Video Editor server and available capabilities. Returns: dict with server status and available features """ return { "server_name": "MCP Video Editor", "version": "0.1.0", "status": "running", "moviepy_available": MOVIEPY_AVAILABLE, "opencv_available": CV2_AVAILABLE, "ffmpeg_available": FFMPEG_AVAILABLE, "active_sessions": len(recording_sessions), } @mcp.tool() def mcp_video_recorder_start( filename: str, resolution: str = "1920x1080", framerate: int = 30, region: str | None = None, ) -> dict[str, str | int]: """ Start reliable video capture with persistent recording sessions. Args: filename: Video output filename (e.g., "demo.mp4") resolution: Recording resolution (e.g., "1920x1080", "1280x720") framerate: Recording framerate (e.g., 30, 60) region: Optional screen region coordinates as "x,y,width,height" Returns: dict with session_id for tracking and recording details """ import uuid session_id = str(uuid.uuid4()) width, height = map(int, resolution.split("x")) session = VideoRecordingSession(session_id, filename, (width, height), framerate) output_path = session.start() recording_sessions[session_id] = session return { "session_id": session_id, "filename": filename, "resolution": resolution, "framerate": framerate, "output_path": output_path, "status": "recording_started", } @mcp.tool() def mcp_video_recorder_stop(session_id: str) -> dict[str, str | int | float]: """ Stop recording and ensure file is saved. Args: session_id: Recording session ID from start command Returns: dict with file path and recording statistics """ if session_id not in recording_sessions: return {"error": f"Session {session_id} not found"} session = recording_sessions[session_id] session.is_recording = False # Simulate recording completion and file size calculation file_size = 0 duration = 0.0 if session.output_path and os.path.exists(session.output_path): file_size = os.path.getsize(session.output_path) try: with VideoFileClip(session.output_path) as clip: duration = clip.duration except: duration = 0.0 result = { "session_id": session_id, "output_path": session.output_path, "file_size_bytes": file_size, "duration_seconds": duration, "status": "recording_stopped", } # Clean up session del recording_sessions[session_id] return result @mcp.tool() def mcp_video_concatenate( input_clips: list[str], output_path: str, transition_type: str = "cut" ) -> dict[str, str | float]: """ Join multiple video clips into single file. Args: input_clips: Array of video file paths to concatenate output_path: Combined video output location transition_type: Type of transition ("cut", "fade", "dissolve") Returns: dict with output path and total duration """ try: clips = [] total_duration = 0.0 for clip_path in input_clips: if not os.path.exists(clip_path): return {"error": f"Input file not found: {clip_path}"} clip = VideoFileClip(clip_path) clips.append(clip) total_duration += clip.duration if not clips: return {"error": "No valid input clips provided"} # Apply transitions if specified if transition_type == "fade": # Add crossfade transitions between clips for i in range(1, len(clips)): clips[i] = clips[i].crossfadein(1.0) # Concatenate clips final_clip = concatenate_videoclips(clips, method="compose") final_clip.write_videofile(output_path, audio_codec="aac") # Clean up for clip in clips: clip.close() final_clip.close() return { "output_path": output_path, "total_duration": total_duration, "clips_count": len(input_clips), "status": "success", } except Exception as e: return {"error": f"Concatenation failed: {e!s}"} @mcp.tool() def mcp_video_trim( input_path: str, start_time: float, end_time: float, output_path: str ) -> dict[str, str | float]: """ Cut video segments to specific timeframes. Args: input_path: Input video file path start_time: Start time in seconds end_time: End time in seconds output_path: Output video file path Returns: dict with output path and trimmed duration """ try: if not os.path.exists(input_path): return {"error": f"Input file not found: {input_path}"} with VideoFileClip(input_path) as clip: if ( start_time >= clip.duration or end_time > clip.duration or start_time >= end_time ): return {"error": "Invalid time range specified"} trimmed_clip = clip.subclip(start_time, end_time) trimmed_clip.write_videofile(output_path, audio_codec="aac") duration = trimmed_clip.duration trimmed_clip.close() return { "output_path": output_path, "original_duration": clip.duration, "trimmed_duration": duration, "start_time": start_time, "end_time": end_time, "status": "success", } except Exception as e: return {"error": f"Trimming failed: {e!s}"} @mcp.tool() def mcp_video_speed_control( input_path: str, speed_multiplier: float, output_path: str, start_time: float | None = None, end_time: float | None = None, ) -> dict[str, str | float]: """ Adjust playback speed for specific segments. Args: input_path: Input video file path speed_multiplier: Speed multiplier (0.5 = half speed, 2.0 = double speed) output_path: Output video file path start_time: Optional start time for speed change (entire video if not specified) end_time: Optional end time for speed change Returns: dict with output path and new duration """ try: if not os.path.exists(input_path): return {"error": f"Input file not found: {input_path}"} with VideoFileClip(input_path) as clip: if start_time is not None and end_time is not None: # Apply speed change to specific segment before_clip = clip.subclip(0, start_time) if start_time > 0 else None speed_clip = clip.subclip(start_time, end_time).fx( speedx, speed_multiplier ) after_clip = ( clip.subclip(end_time) if end_time < clip.duration else None ) clips_to_concat = [ c for c in [before_clip, speed_clip, after_clip] if c is not None ] final_clip = concatenate_videoclips(clips_to_concat) else: # Apply speed change to entire video final_clip = clip.fx(speedx, speed_multiplier) final_clip.write_videofile(output_path, audio_codec="aac") new_duration = final_clip.duration final_clip.close() return { "output_path": output_path, "original_duration": clip.duration, "new_duration": new_duration, "speed_multiplier": speed_multiplier, "status": "success", } except Exception as e: return {"error": f"Speed control failed: {e!s}"} @mcp.tool() def mcp_video_add_overlay( input_path: str, output_path: str, overlay_type: str, text: str | None = None, position: str = "center", duration: float | None = None, start_time: float = 0, style: dict | None = None, ) -> dict[str, str]: """ Add graphics, text, shapes over video content. Args: input_path: Input video file path output_path: Output video file path overlay_type: Type of overlay ("text", "image", "shape", "arrow") text: Text content (for text overlays) position: Position on screen ("center", "top-left", "bottom-right", etc.) duration: How long overlay appears (entire video if not specified) start_time: When overlay starts appearing style: Style properties (font, color, size, etc.) Returns: dict with output path and overlay details """ try: if not os.path.exists(input_path): return {"error": f"Input file not found: {input_path}"} with VideoFileClip(input_path) as clip: if overlay_type == "text" and text: # Default style (corrected parameter names) default_style = {"font_size": 50, "color": "white"} if style: default_style.update(style) # Create text clip with correct syntax txt_clip = TextClip( text=text, font_size=default_style["font_size"], color=default_style["color"], duration=duration or (clip.duration - start_time) ).with_position(position).with_start(start_time) # Duration is already set in TextClip constructor # Composite video with text overlay final_clip = CompositeVideoClip([clip, txt_clip]) final_clip.write_videofile(output_path, audio_codec="aac") final_clip.close() else: return {"error": f"Overlay type '{overlay_type}' not implemented yet"} return { "output_path": output_path, "overlay_type": overlay_type, "position": position, "start_time": start_time, "duration": duration or (clip.duration - start_time), "status": "success", } except Exception as e: return {"error": f"Overlay addition failed: {e!s}"} @mcp.tool() def mcp_video_format_convert( input_path: str, output_path: str, output_format: str = "mp4", quality_preset: str = "balanced", compression_level: str = "medium", ) -> dict[str, str | int | float]: """ Export to different video formats and qualities. Args: input_path: Input video file path output_path: Output video file path output_format: Target format ("mp4", "webm", "mov", "avi") quality_preset: Quality preset ("web-optimized", "high-quality", "mobile", "balanced") compression_level: Compression level ("low", "medium", "high") Returns: dict with conversion results and file info """ try: if not os.path.exists(input_path): return {"error": f"Input file not found: {input_path}"} # Quality settings based on preset quality_settings = { "web-optimized": {"crf": 28, "preset": "medium"}, "high-quality": {"crf": 18, "preset": "slow"}, "mobile": {"crf": 32, "preset": "fast"}, "balanced": {"crf": 23, "preset": "medium"}, } settings = quality_settings.get(quality_preset, quality_settings["balanced"]) with VideoFileClip(input_path) as clip: # Write with specified format and quality codec_map = { "mp4": "libx264", "webm": "libvpx-vp9", "mov": "libx264", "avi": "libx264", } codec = codec_map.get(output_format.lower(), "libx264") clip.write_videofile( output_path, codec=codec, audio_codec="aac", preset=settings["preset"] ) original_size = os.path.getsize(input_path) converted_size = os.path.getsize(output_path) compression_ratio = ( converted_size / original_size if original_size > 0 else 0 ) return { "output_path": output_path, "output_format": output_format, "quality_preset": quality_preset, "original_size_bytes": original_size, "converted_size_bytes": converted_size, "compression_ratio": compression_ratio, "duration": clip.duration, "status": "success", } except Exception as e: return {"error": f"Format conversion failed: {e!s}"} @mcp.tool() def mcp_audio_mix_tracks( audio_files: list[str], output_path: str, volume_levels: list[float] | None = None, sync_timing: list[float] | None = None, ) -> dict[str, str | float | int]: """ Combine multiple audio tracks with volume control and timing. Args: audio_files: list of audio file paths to mix output_path: Output audio file path volume_levels: Volume multipliers for each track (1.0 = original volume) sync_timing: Start times for each track in seconds Returns: dict with output path and mixing details """ try: if not audio_files: return {"error": "No audio files provided"} audio_clips = [] total_duration = 0.0 for i, audio_path in enumerate(audio_files): if not os.path.exists(audio_path): return {"error": f"Audio file not found: {audio_path}"} clip = AudioFileClip(audio_path) # Apply volume adjustment if specified if volume_levels and i < len(volume_levels): clip = clip.fx(volumex, volume_levels[i]) # Apply timing offset if specified if sync_timing and i < len(sync_timing): clip = clip.set_start(sync_timing[i]) audio_clips.append(clip) clip_end_time = ( sync_timing[i] if sync_timing and i < len(sync_timing) else 0 ) + clip.duration total_duration = max(total_duration, clip_end_time) # Composite all audio clips from moviepy.audio.AudioClip import CompositeAudioClip final_audio = CompositeAudioClip(audio_clips) final_audio.write_audiofile(output_path) # Clean up for clip in audio_clips: clip.close() final_audio.close() return { "output_path": output_path, "total_duration": total_duration, "tracks_count": len(audio_files), "volume_levels": volume_levels or [1.0] * len(audio_files), "status": "success", } except Exception as e: return {"error": f"Audio mixing failed: {e!s}"} @mcp.tool() def mcp_audio_sync_video( video_path: str, audio_path: str, output_path: str, audio_start_time: float = 0.0, replace_audio: bool = True, ) -> dict[str, str | float]: """ Synchronize audio track with video timeline. Args: video_path: Input video file path audio_path: Audio file to sync with video output_path: Output video file path audio_start_time: When audio should start in video timeline replace_audio: Whether to replace existing audio or mix with it Returns: dict with output path and sync details """ try: if not os.path.exists(video_path): return {"error": f"Video file not found: {video_path}"} if not os.path.exists(audio_path): return {"error": f"Audio file not found: {audio_path}"} video_clip = VideoFileClip(video_path) audio_clip = AudioFileClip(audio_path).set_start(audio_start_time) if replace_audio: # Replace original audio with new audio final_clip = video_clip.set_audio(audio_clip) else: # Mix new audio with existing audio if video_clip.audio: from moviepy.audio.AudioClip import CompositeAudioClip mixed_audio = CompositeAudioClip([video_clip.audio, audio_clip]) final_clip = video_clip.set_audio(mixed_audio) else: final_clip = video_clip.set_audio(audio_clip) final_clip.write_videofile(output_path, audio_codec="aac") # Clean up video_clip.close() audio_clip.close() final_clip.close() return { "output_path": output_path, "video_duration": video_clip.duration, "audio_start_time": audio_start_time, "audio_duration": audio_clip.duration, "replace_audio": replace_audio, "status": "success", } except Exception as e: return {"error": f"Audio sync failed: {e!s}"} @mcp.tool() def mcp_video_add_branding( input_path: str, output_path: str, logo_path: str | None = None, brand_colors: dict | None = None, position: str = "bottom-right", opacity: float = 0.8, size_scale: float = 0.1, ) -> dict[str, str | dict]: """ Apply consistent branding elements (logos, colors) to video. Args: input_path: Input video file path output_path: Output video file path logo_path: Path to logo image file brand_colors: Brand color scheme dict position: Logo position ("bottom-right", "top-left", "center", etc.) opacity: Logo opacity (0.0 to 1.0) size_scale: Logo size relative to video dimensions Returns: dict with output path and branding details """ try: if not os.path.exists(input_path): return {"error": f"Input video file not found: {input_path}"} video_clip = VideoFileClip(input_path) if logo_path and os.path.exists(logo_path): # Add logo overlay from moviepy.editor import ImageClip logo_clip = ImageClip(logo_path, transparent=True) # Scale logo based on video dimensions video_width, video_height = video_clip.size logo_width = int(video_width * size_scale) logo_clip = logo_clip.resize(width=logo_width) # Position logo position_map = { "top-left": ("left", "top"), "top-right": ("right", "top"), "bottom-left": ("left", "bottom"), "bottom-right": ("right", "bottom"), "center": ("center", "center"), } pos = position_map.get(position, ("right", "bottom")) logo_clip = ( logo_clip.set_position(pos) .set_duration(video_clip.duration) .set_opacity(opacity) ) # Composite video with logo final_clip = CompositeVideoClip([video_clip, logo_clip]) else: final_clip = video_clip final_clip.write_videofile(output_path, audio_codec="aac") # Clean up video_clip.close() if logo_path: logo_clip.close() final_clip.close() return { "output_path": output_path, "logo_path": logo_path, "position": position, "opacity": opacity, "size_scale": size_scale, "brand_colors": brand_colors or {}, "status": "success", } except Exception as e: return {"error": f"Branding application failed: {e!s}"} @mcp.tool() def mcp_video_resolution_optimizer( input_path: str, output_directory: str, target_resolutions: list[str] = None, quality_settings: dict | None = None, ) -> dict[str, str | list | dict]: """ Generate multiple resolutions from source video. Args: input_path: Input video file path output_directory: Directory to save optimized versions target_resolutions: list of target resolutions (e.g., ["1080p", "720p", "480p"]) quality_settings: Quality settings for each resolution Returns: dict with generated file paths and optimization details """ try: if not os.path.exists(input_path): return {"error": f"Input video file not found: {input_path}"} os.makedirs(output_directory, exist_ok=True) if target_resolutions is None: target_resolutions = ["1080p", "720p", "480p"] resolution_map = { "1080p": (1920, 1080), "720p": (1280, 720), "480p": (854, 480), "360p": (640, 360), } video_clip = VideoFileClip(input_path) original_size = video_clip.size base_filename = Path(input_path).stem generated_files = [] for res in target_resolutions: if res not in resolution_map: continue target_width, target_height = resolution_map[res] # Skip if target resolution is larger than original if target_width > original_size[0] or target_height > original_size[1]: continue # Resize video resized_clip = video_clip.resize((target_width, target_height)) # Generate output filename output_filename = f"{base_filename}_{res}.mp4" output_path = os.path.join(output_directory, output_filename) # Write resized video resized_clip.write_videofile(output_path, audio_codec="aac") generated_files.append( { "resolution": res, "dimensions": f"{target_width}x{target_height}", "output_path": output_path, "file_size": os.path.getsize(output_path), } ) resized_clip.close() video_clip.close() return { "input_path": input_path, "output_directory": output_directory, "original_resolution": f"{original_size[0]}x{original_size[1]}", "generated_files": generated_files, "total_files": len(generated_files), "status": "success", } except Exception as e: return {"error": f"Resolution optimization failed: {e!s}"} def main(): """Main entry point for the MCP Video Editor server.""" mcp.run() if __name__ == "__main__": main()