"""MCP Video Editor Server - Professional video production tools.""" import os from pathlib import Path from typing import Dict, List, Optional, Union from fastmcp import FastMCP from moviepy.audio.fx import volumex from moviepy.audio.io.AudioFileClip import AudioFileClip from moviepy.editor import ( CompositeVideoClip, TextClip, VideoFileClip, concatenate_videoclips, ) from moviepy.video.fx import speedx # Initialize FastMCP server mcp = FastMCP("MCP Video Editor") class VideoRecordingSession: """Manages video recording sessions.""" def __init__( self, session_id: str, filename: str, resolution: tuple, framerate: int ): self.session_id = session_id self.filename = filename self.resolution = resolution self.framerate = framerate self.is_recording = False self.output_path = None def start(self, output_dir: str = "./temp_videos"): """Start recording session.""" os.makedirs(output_dir, exist_ok=True) self.output_path = os.path.join(output_dir, self.filename) self.is_recording = True return self.output_path # Global recording sessions store recording_sessions: Dict[str, VideoRecordingSession] = {} @mcp.tool() def mcp_video_recorder_start( filename: str, resolution: str = "1920x1080", framerate: int = 30, region: Optional[str] = None, ) -> Dict[str, Union[str, int]]: """ Start reliable video capture with persistent recording sessions. Args: filename: Video output filename (e.g., "demo.mp4") resolution: Recording resolution (e.g., "1920x1080", "1280x720") framerate: Recording framerate (e.g., 30, 60) region: Optional screen region coordinates as "x,y,width,height" Returns: Dict with session_id for tracking and recording details """ import uuid session_id = str(uuid.uuid4()) width, height = map(int, resolution.split("x")) session = VideoRecordingSession(session_id, filename, (width, height), framerate) output_path = session.start() recording_sessions[session_id] = session return { "session_id": session_id, "filename": filename, "resolution": resolution, "framerate": framerate, "output_path": output_path, "status": "recording_started", } @mcp.tool() def mcp_video_recorder_stop(session_id: str) -> Dict[str, Union[str, int, float]]: """ Stop recording and ensure file is saved. Args: session_id: Recording session ID from start command Returns: Dict with file path and recording statistics """ if session_id not in recording_sessions: return {"error": f"Session {session_id} not found"} session = recording_sessions[session_id] session.is_recording = False # Simulate recording completion and file size calculation file_size = 0 duration = 0.0 if session.output_path and os.path.exists(session.output_path): file_size = os.path.getsize(session.output_path) try: with VideoFileClip(session.output_path) as clip: duration = clip.duration except: duration = 0.0 result = { "session_id": session_id, "output_path": session.output_path, "file_size_bytes": file_size, "duration_seconds": duration, "status": "recording_stopped", } # Clean up session del recording_sessions[session_id] return result @mcp.tool() def mcp_video_concatenate( input_clips: List[str], output_path: str, transition_type: str = "cut" ) -> Dict[str, Union[str, float]]: """ Join multiple video clips into single file. Args: input_clips: Array of video file paths to concatenate output_path: Combined video output location transition_type: Type of transition ("cut", "fade", "dissolve") Returns: Dict with output path and total duration """ try: clips = [] total_duration = 0.0 for clip_path in input_clips: if not os.path.exists(clip_path): return {"error": f"Input file not found: {clip_path}"} clip = VideoFileClip(clip_path) clips.append(clip) total_duration += clip.duration if not clips: return {"error": "No valid input clips provided"} # Apply transitions if specified if transition_type == "fade": # Add crossfade transitions between clips for i in range(1, len(clips)): clips[i] = clips[i].crossfadein(1.0) # Concatenate clips final_clip = concatenate_videoclips(clips, method="compose") final_clip.write_videofile(output_path, audio_codec="aac") # Clean up for clip in clips: clip.close() final_clip.close() return { "output_path": output_path, "total_duration": total_duration, "clips_count": len(input_clips), "status": "success", } except Exception as e: return {"error": f"Concatenation failed: {e!s}"} @mcp.tool() def mcp_video_trim( input_path: str, start_time: float, end_time: float, output_path: str ) -> Dict[str, Union[str, float]]: """ Cut video segments to specific timeframes. Args: input_path: Input video file path start_time: Start time in seconds end_time: End time in seconds output_path: Output video file path Returns: Dict with output path and trimmed duration """ try: if not os.path.exists(input_path): return {"error": f"Input file not found: {input_path}"} with VideoFileClip(input_path) as clip: if ( start_time >= clip.duration or end_time > clip.duration or start_time >= end_time ): return {"error": "Invalid time range specified"} trimmed_clip = clip.subclip(start_time, end_time) trimmed_clip.write_videofile(output_path, audio_codec="aac") duration = trimmed_clip.duration trimmed_clip.close() return { "output_path": output_path, "original_duration": clip.duration, "trimmed_duration": duration, "start_time": start_time, "end_time": end_time, "status": "success", } except Exception as e: return {"error": f"Trimming failed: {e!s}"} @mcp.tool() def mcp_video_speed_control( input_path: str, speed_multiplier: float, output_path: str, start_time: Optional[float] = None, end_time: Optional[float] = None, ) -> Dict[str, Union[str, float]]: """ Adjust playback speed for specific segments. Args: input_path: Input video file path speed_multiplier: Speed multiplier (0.5 = half speed, 2.0 = double speed) output_path: Output video file path start_time: Optional start time for speed change (entire video if not specified) end_time: Optional end time for speed change Returns: Dict with output path and new duration """ try: if not os.path.exists(input_path): return {"error": f"Input file not found: {input_path}"} with VideoFileClip(input_path) as clip: if start_time is not None and end_time is not None: # Apply speed change to specific segment before_clip = clip.subclip(0, start_time) if start_time > 0 else None speed_clip = clip.subclip(start_time, end_time).fx( speedx, speed_multiplier ) after_clip = ( clip.subclip(end_time) if end_time < clip.duration else None ) clips_to_concat = [ c for c in [before_clip, speed_clip, after_clip] if c is not None ] final_clip = concatenate_videoclips(clips_to_concat) else: # Apply speed change to entire video final_clip = clip.fx(speedx, speed_multiplier) final_clip.write_videofile(output_path, audio_codec="aac") new_duration = final_clip.duration final_clip.close() return { "output_path": output_path, "original_duration": clip.duration, "new_duration": new_duration, "speed_multiplier": speed_multiplier, "status": "success", } except Exception as e: return {"error": f"Speed control failed: {e!s}"} @mcp.tool() def mcp_video_add_overlay( input_path: str, output_path: str, overlay_type: str, text: Optional[str] = None, position: str = "center", duration: Optional[float] = None, start_time: float = 0, style: Optional[Dict] = None, ) -> Dict[str, str]: """ Add graphics, text, shapes over video content. Args: input_path: Input video file path output_path: Output video file path overlay_type: Type of overlay ("text", "image", "shape", "arrow") text: Text content (for text overlays) position: Position on screen ("center", "top-left", "bottom-right", etc.) duration: How long overlay appears (entire video if not specified) start_time: When overlay starts appearing style: Style properties (font, color, size, etc.) Returns: Dict with output path and overlay details """ try: if not os.path.exists(input_path): return {"error": f"Input file not found: {input_path}"} with VideoFileClip(input_path) as clip: if overlay_type == "text" and text: # Default style default_style = {"fontsize": 50, "color": "white", "font": "Arial-Bold"} if style: default_style.update(style) # Create text clip txt_clip = ( TextClip( text, fontsize=default_style["fontsize"], color=default_style["color"], font=default_style["font"], ) .set_position(position) .set_start(start_time) ) if duration: txt_clip = txt_clip.set_duration(duration) else: txt_clip = txt_clip.set_duration(clip.duration - start_time) # Composite video with text overlay final_clip = CompositeVideoClip([clip, txt_clip]) final_clip.write_videofile(output_path, audio_codec="aac") final_clip.close() else: return {"error": f"Overlay type '{overlay_type}' not implemented yet"} return { "output_path": output_path, "overlay_type": overlay_type, "position": position, "start_time": start_time, "duration": duration or (clip.duration - start_time), "status": "success", } except Exception as e: return {"error": f"Overlay addition failed: {e!s}"} @mcp.tool() def mcp_video_format_convert( input_path: str, output_path: str, output_format: str = "mp4", quality_preset: str = "balanced", compression_level: str = "medium", ) -> Dict[str, Union[str, int, float]]: """ Export to different video formats and qualities. Args: input_path: Input video file path output_path: Output video file path output_format: Target format ("mp4", "webm", "mov", "avi") quality_preset: Quality preset ("web-optimized", "high-quality", "mobile", "balanced") compression_level: Compression level ("low", "medium", "high") Returns: Dict with conversion results and file info """ try: if not os.path.exists(input_path): return {"error": f"Input file not found: {input_path}"} # Quality settings based on preset quality_settings = { "web-optimized": {"crf": 28, "preset": "medium"}, "high-quality": {"crf": 18, "preset": "slow"}, "mobile": {"crf": 32, "preset": "fast"}, "balanced": {"crf": 23, "preset": "medium"}, } settings = quality_settings.get(quality_preset, quality_settings["balanced"]) with VideoFileClip(input_path) as clip: # Write with specified format and quality codec_map = { "mp4": "libx264", "webm": "libvpx-vp9", "mov": "libx264", "avi": "libx264", } codec = codec_map.get(output_format.lower(), "libx264") clip.write_videofile( output_path, codec=codec, audio_codec="aac", preset=settings["preset"] ) original_size = os.path.getsize(input_path) converted_size = os.path.getsize(output_path) compression_ratio = ( converted_size / original_size if original_size > 0 else 0 ) return { "output_path": output_path, "output_format": output_format, "quality_preset": quality_preset, "original_size_bytes": original_size, "converted_size_bytes": converted_size, "compression_ratio": compression_ratio, "duration": clip.duration, "status": "success", } except Exception as e: return {"error": f"Format conversion failed: {e!s}"} @mcp.tool() def mcp_audio_mix_tracks( audio_files: List[str], output_path: str, volume_levels: Optional[List[float]] = None, sync_timing: Optional[List[float]] = None, ) -> Dict[str, Union[str, float, int]]: """ Combine multiple audio tracks with volume control and timing. Args: audio_files: List of audio file paths to mix output_path: Output audio file path volume_levels: Volume multipliers for each track (1.0 = original volume) sync_timing: Start times for each track in seconds Returns: Dict with output path and mixing details """ try: if not audio_files: return {"error": "No audio files provided"} audio_clips = [] total_duration = 0.0 for i, audio_path in enumerate(audio_files): if not os.path.exists(audio_path): return {"error": f"Audio file not found: {audio_path}"} clip = AudioFileClip(audio_path) # Apply volume adjustment if specified if volume_levels and i < len(volume_levels): clip = clip.fx(volumex, volume_levels[i]) # Apply timing offset if specified if sync_timing and i < len(sync_timing): clip = clip.set_start(sync_timing[i]) audio_clips.append(clip) clip_end_time = ( sync_timing[i] if sync_timing and i < len(sync_timing) else 0 ) + clip.duration total_duration = max(total_duration, clip_end_time) # Composite all audio clips from moviepy.audio.AudioClip import CompositeAudioClip final_audio = CompositeAudioClip(audio_clips) final_audio.write_audiofile(output_path) # Clean up for clip in audio_clips: clip.close() final_audio.close() return { "output_path": output_path, "total_duration": total_duration, "tracks_count": len(audio_files), "volume_levels": volume_levels or [1.0] * len(audio_files), "status": "success", } except Exception as e: return {"error": f"Audio mixing failed: {e!s}"} @mcp.tool() def mcp_audio_sync_video( video_path: str, audio_path: str, output_path: str, audio_start_time: float = 0.0, replace_audio: bool = True, ) -> Dict[str, Union[str, float]]: """ Synchronize audio track with video timeline. Args: video_path: Input video file path audio_path: Audio file to sync with video output_path: Output video file path audio_start_time: When audio should start in video timeline replace_audio: Whether to replace existing audio or mix with it Returns: Dict with output path and sync details """ try: if not os.path.exists(video_path): return {"error": f"Video file not found: {video_path}"} if not os.path.exists(audio_path): return {"error": f"Audio file not found: {audio_path}"} video_clip = VideoFileClip(video_path) audio_clip = AudioFileClip(audio_path).set_start(audio_start_time) if replace_audio: # Replace original audio with new audio final_clip = video_clip.set_audio(audio_clip) else: # Mix new audio with existing audio if video_clip.audio: from moviepy.audio.AudioClip import CompositeAudioClip mixed_audio = CompositeAudioClip([video_clip.audio, audio_clip]) final_clip = video_clip.set_audio(mixed_audio) else: final_clip = video_clip.set_audio(audio_clip) final_clip.write_videofile(output_path, audio_codec="aac") # Clean up video_clip.close() audio_clip.close() final_clip.close() return { "output_path": output_path, "video_duration": video_clip.duration, "audio_start_time": audio_start_time, "audio_duration": audio_clip.duration, "replace_audio": replace_audio, "status": "success", } except Exception as e: return {"error": f"Audio sync failed: {e!s}"} @mcp.tool() def mcp_video_add_branding( input_path: str, output_path: str, logo_path: Optional[str] = None, brand_colors: Optional[Dict] = None, position: str = "bottom-right", opacity: float = 0.8, size_scale: float = 0.1, ) -> Dict[str, Union[str, Dict]]: """ Apply consistent branding elements (logos, colors) to video. Args: input_path: Input video file path output_path: Output video file path logo_path: Path to logo image file brand_colors: Brand color scheme dict position: Logo position ("bottom-right", "top-left", "center", etc.) opacity: Logo opacity (0.0 to 1.0) size_scale: Logo size relative to video dimensions Returns: Dict with output path and branding details """ try: if not os.path.exists(input_path): return {"error": f"Input video file not found: {input_path}"} video_clip = VideoFileClip(input_path) if logo_path and os.path.exists(logo_path): # Add logo overlay from moviepy.editor import ImageClip logo_clip = ImageClip(logo_path, transparent=True) # Scale logo based on video dimensions video_width, video_height = video_clip.size logo_width = int(video_width * size_scale) logo_clip = logo_clip.resize(width=logo_width) # Position logo position_map = { "top-left": ("left", "top"), "top-right": ("right", "top"), "bottom-left": ("left", "bottom"), "bottom-right": ("right", "bottom"), "center": ("center", "center"), } pos = position_map.get(position, ("right", "bottom")) logo_clip = ( logo_clip.set_position(pos) .set_duration(video_clip.duration) .set_opacity(opacity) ) # Composite video with logo final_clip = CompositeVideoClip([video_clip, logo_clip]) else: final_clip = video_clip final_clip.write_videofile(output_path, audio_codec="aac") # Clean up video_clip.close() if logo_path: logo_clip.close() final_clip.close() return { "output_path": output_path, "logo_path": logo_path, "position": position, "opacity": opacity, "size_scale": size_scale, "brand_colors": brand_colors or {}, "status": "success", } except Exception as e: return {"error": f"Branding application failed: {e!s}"} @mcp.tool() def mcp_video_resolution_optimizer( input_path: str, output_directory: str, target_resolutions: List[str] = None, quality_settings: Optional[Dict] = None, ) -> Dict[str, Union[str, List, Dict]]: """ Generate multiple resolutions from source video. Args: input_path: Input video file path output_directory: Directory to save optimized versions target_resolutions: List of target resolutions (e.g., ["1080p", "720p", "480p"]) quality_settings: Quality settings for each resolution Returns: Dict with generated file paths and optimization details """ try: if not os.path.exists(input_path): return {"error": f"Input video file not found: {input_path}"} os.makedirs(output_directory, exist_ok=True) if target_resolutions is None: target_resolutions = ["1080p", "720p", "480p"] resolution_map = { "1080p": (1920, 1080), "720p": (1280, 720), "480p": (854, 480), "360p": (640, 360), } video_clip = VideoFileClip(input_path) original_size = video_clip.size base_filename = Path(input_path).stem generated_files = [] for res in target_resolutions: if res not in resolution_map: continue target_width, target_height = resolution_map[res] # Skip if target resolution is larger than original if target_width > original_size[0] or target_height > original_size[1]: continue # Resize video resized_clip = video_clip.resize((target_width, target_height)) # Generate output filename output_filename = f"{base_filename}_{res}.mp4" output_path = os.path.join(output_directory, output_filename) # Write resized video resized_clip.write_videofile(output_path, audio_codec="aac") generated_files.append( { "resolution": res, "dimensions": f"{target_width}x{target_height}", "output_path": output_path, "file_size": os.path.getsize(output_path), } ) resized_clip.close() video_clip.close() return { "input_path": input_path, "output_directory": output_directory, "original_resolution": f"{original_size[0]}x{original_size[1]}", "generated_files": generated_files, "total_files": len(generated_files), "status": "success", } except Exception as e: return {"error": f"Resolution optimization failed: {e!s}"} def main(): """Main entry point for the MCP Video Editor server.""" mcp.run() if __name__ == "__main__": main()