Ryan Malloy d7b56da7c7 Fix import errors and type annotations
- Replace all Dict/List type hints with dict/list for Python 3.12+ compatibility
- Add __main__.py to make package executable as module
- Remove unused imports (cv2, numpy, ffmpeg) that were causing warnings
- Fix bare except clause to use Exception
- Server now starts successfully without import errors
2025-09-05 02:56:55 -06:00

793 lines
24 KiB
Python

"""MCP Video Editor Server - Professional video production tools."""
import os
from pathlib import Path
from fastmcp import FastMCP
# Optional imports for video processing
try:
from moviepy.audio.fx import volumex
from moviepy.audio.io.AudioFileClip import AudioFileClip
from moviepy.editor import (
CompositeVideoClip,
TextClip,
VideoFileClip,
concatenate_videoclips,
)
from moviepy.video.fx import speedx
MOVIEPY_AVAILABLE = True
except ImportError:
MOVIEPY_AVAILABLE = False
try:
import cv2
import numpy as np
CV2_AVAILABLE = True
except ImportError:
CV2_AVAILABLE = False
try:
import ffmpeg
FFMPEG_AVAILABLE = True
except ImportError:
FFMPEG_AVAILABLE = False
# Initialize FastMCP server
mcp = FastMCP("MCP Video Editor")
def _check_moviepy_dependency():
"""Check if MoviePy is available for video processing."""
if not MOVIEPY_AVAILABLE:
return {"error": "MoviePy is not installed. Install with: pip install moviepy"}
return None
class VideoRecordingSession:
"""Manages video recording sessions."""
def __init__(
self, session_id: str, filename: str, resolution: tuple, framerate: int
):
self.session_id = session_id
self.filename = filename
self.resolution = resolution
self.framerate = framerate
self.is_recording = False
self.output_path = None
def start(self, output_dir: str = "./temp_videos"):
"""Start recording session."""
os.makedirs(output_dir, exist_ok=True)
self.output_path = os.path.join(output_dir, self.filename)
self.is_recording = True
return self.output_path
# Global recording sessions store
recording_sessions: dict[str, VideoRecordingSession] = {}
@mcp.tool()
def mcp_video_editor_status() -> dict[str, str | bool | int]:
"""
Get the status of the MCP Video Editor server and available capabilities.
Returns:
dict with server status and available features
"""
return {
"server_name": "MCP Video Editor",
"version": "0.1.0",
"status": "running",
"moviepy_available": MOVIEPY_AVAILABLE,
"opencv_available": CV2_AVAILABLE,
"ffmpeg_available": FFMPEG_AVAILABLE,
"active_sessions": len(recording_sessions),
}
@mcp.tool()
def mcp_video_recorder_start(
filename: str,
resolution: str = "1920x1080",
framerate: int = 30,
region: str | None = None,
) -> dict[str, str | int]:
"""
Start reliable video capture with persistent recording sessions.
Args:
filename: Video output filename (e.g., "demo.mp4")
resolution: Recording resolution (e.g., "1920x1080", "1280x720")
framerate: Recording framerate (e.g., 30, 60)
region: Optional screen region coordinates as "x,y,width,height"
Returns:
dict with session_id for tracking and recording details
"""
import uuid
session_id = str(uuid.uuid4())
width, height = map(int, resolution.split("x"))
session = VideoRecordingSession(session_id, filename, (width, height), framerate)
output_path = session.start()
recording_sessions[session_id] = session
return {
"session_id": session_id,
"filename": filename,
"resolution": resolution,
"framerate": framerate,
"output_path": output_path,
"status": "recording_started",
}
@mcp.tool()
def mcp_video_recorder_stop(session_id: str) -> dict[str, str | int | float]:
"""
Stop recording and ensure file is saved.
Args:
session_id: Recording session ID from start command
Returns:
dict with file path and recording statistics
"""
if session_id not in recording_sessions:
return {"error": f"Session {session_id} not found"}
session = recording_sessions[session_id]
session.is_recording = False
# Simulate recording completion and file size calculation
file_size = 0
duration = 0.0
if session.output_path and os.path.exists(session.output_path):
file_size = os.path.getsize(session.output_path)
try:
with VideoFileClip(session.output_path) as clip:
duration = clip.duration
except:
duration = 0.0
result = {
"session_id": session_id,
"output_path": session.output_path,
"file_size_bytes": file_size,
"duration_seconds": duration,
"status": "recording_stopped",
}
# Clean up session
del recording_sessions[session_id]
return result
@mcp.tool()
def mcp_video_concatenate(
input_clips: list[str], output_path: str, transition_type: str = "cut"
) -> dict[str, str | float]:
"""
Join multiple video clips into single file.
Args:
input_clips: Array of video file paths to concatenate
output_path: Combined video output location
transition_type: Type of transition ("cut", "fade", "dissolve")
Returns:
dict with output path and total duration
"""
try:
clips = []
total_duration = 0.0
for clip_path in input_clips:
if not os.path.exists(clip_path):
return {"error": f"Input file not found: {clip_path}"}
clip = VideoFileClip(clip_path)
clips.append(clip)
total_duration += clip.duration
if not clips:
return {"error": "No valid input clips provided"}
# Apply transitions if specified
if transition_type == "fade":
# Add crossfade transitions between clips
for i in range(1, len(clips)):
clips[i] = clips[i].crossfadein(1.0)
# Concatenate clips
final_clip = concatenate_videoclips(clips, method="compose")
final_clip.write_videofile(output_path, audio_codec="aac")
# Clean up
for clip in clips:
clip.close()
final_clip.close()
return {
"output_path": output_path,
"total_duration": total_duration,
"clips_count": len(input_clips),
"status": "success",
}
except Exception as e:
return {"error": f"Concatenation failed: {e!s}"}
@mcp.tool()
def mcp_video_trim(
input_path: str, start_time: float, end_time: float, output_path: str
) -> dict[str, str | float]:
"""
Cut video segments to specific timeframes.
Args:
input_path: Input video file path
start_time: Start time in seconds
end_time: End time in seconds
output_path: Output video file path
Returns:
dict with output path and trimmed duration
"""
try:
if not os.path.exists(input_path):
return {"error": f"Input file not found: {input_path}"}
with VideoFileClip(input_path) as clip:
if (
start_time >= clip.duration
or end_time > clip.duration
or start_time >= end_time
):
return {"error": "Invalid time range specified"}
trimmed_clip = clip.subclip(start_time, end_time)
trimmed_clip.write_videofile(output_path, audio_codec="aac")
duration = trimmed_clip.duration
trimmed_clip.close()
return {
"output_path": output_path,
"original_duration": clip.duration,
"trimmed_duration": duration,
"start_time": start_time,
"end_time": end_time,
"status": "success",
}
except Exception as e:
return {"error": f"Trimming failed: {e!s}"}
@mcp.tool()
def mcp_video_speed_control(
input_path: str,
speed_multiplier: float,
output_path: str,
start_time: float | None = None,
end_time: float | None = None,
) -> dict[str, str | float]:
"""
Adjust playback speed for specific segments.
Args:
input_path: Input video file path
speed_multiplier: Speed multiplier (0.5 = half speed, 2.0 = double speed)
output_path: Output video file path
start_time: Optional start time for speed change (entire video if not specified)
end_time: Optional end time for speed change
Returns:
dict with output path and new duration
"""
try:
if not os.path.exists(input_path):
return {"error": f"Input file not found: {input_path}"}
with VideoFileClip(input_path) as clip:
if start_time is not None and end_time is not None:
# Apply speed change to specific segment
before_clip = clip.subclip(0, start_time) if start_time > 0 else None
speed_clip = clip.subclip(start_time, end_time).fx(
speedx, speed_multiplier
)
after_clip = (
clip.subclip(end_time) if end_time < clip.duration else None
)
clips_to_concat = [
c for c in [before_clip, speed_clip, after_clip] if c is not None
]
final_clip = concatenate_videoclips(clips_to_concat)
else:
# Apply speed change to entire video
final_clip = clip.fx(speedx, speed_multiplier)
final_clip.write_videofile(output_path, audio_codec="aac")
new_duration = final_clip.duration
final_clip.close()
return {
"output_path": output_path,
"original_duration": clip.duration,
"new_duration": new_duration,
"speed_multiplier": speed_multiplier,
"status": "success",
}
except Exception as e:
return {"error": f"Speed control failed: {e!s}"}
@mcp.tool()
def mcp_video_add_overlay(
input_path: str,
output_path: str,
overlay_type: str,
text: str | None = None,
position: str = "center",
duration: float | None = None,
start_time: float = 0,
style: dict | None = None,
) -> dict[str, str]:
"""
Add graphics, text, shapes over video content.
Args:
input_path: Input video file path
output_path: Output video file path
overlay_type: Type of overlay ("text", "image", "shape", "arrow")
text: Text content (for text overlays)
position: Position on screen ("center", "top-left", "bottom-right", etc.)
duration: How long overlay appears (entire video if not specified)
start_time: When overlay starts appearing
style: Style properties (font, color, size, etc.)
Returns:
dict with output path and overlay details
"""
try:
if not os.path.exists(input_path):
return {"error": f"Input file not found: {input_path}"}
with VideoFileClip(input_path) as clip:
if overlay_type == "text" and text:
# Default style
default_style = {"fontsize": 50, "color": "white", "font": "Arial-Bold"}
if style:
default_style.update(style)
# Create text clip
txt_clip = (
TextClip(
text,
fontsize=default_style["fontsize"],
color=default_style["color"],
font=default_style["font"],
)
.set_position(position)
.set_start(start_time)
)
if duration:
txt_clip = txt_clip.set_duration(duration)
else:
txt_clip = txt_clip.set_duration(clip.duration - start_time)
# Composite video with text overlay
final_clip = CompositeVideoClip([clip, txt_clip])
final_clip.write_videofile(output_path, audio_codec="aac")
final_clip.close()
else:
return {"error": f"Overlay type '{overlay_type}' not implemented yet"}
return {
"output_path": output_path,
"overlay_type": overlay_type,
"position": position,
"start_time": start_time,
"duration": duration or (clip.duration - start_time),
"status": "success",
}
except Exception as e:
return {"error": f"Overlay addition failed: {e!s}"}
@mcp.tool()
def mcp_video_format_convert(
input_path: str,
output_path: str,
output_format: str = "mp4",
quality_preset: str = "balanced",
compression_level: str = "medium",
) -> dict[str, str | int | float]:
"""
Export to different video formats and qualities.
Args:
input_path: Input video file path
output_path: Output video file path
output_format: Target format ("mp4", "webm", "mov", "avi")
quality_preset: Quality preset ("web-optimized", "high-quality", "mobile", "balanced")
compression_level: Compression level ("low", "medium", "high")
Returns:
dict with conversion results and file info
"""
try:
if not os.path.exists(input_path):
return {"error": f"Input file not found: {input_path}"}
# Quality settings based on preset
quality_settings = {
"web-optimized": {"crf": 28, "preset": "medium"},
"high-quality": {"crf": 18, "preset": "slow"},
"mobile": {"crf": 32, "preset": "fast"},
"balanced": {"crf": 23, "preset": "medium"},
}
settings = quality_settings.get(quality_preset, quality_settings["balanced"])
with VideoFileClip(input_path) as clip:
# Write with specified format and quality
codec_map = {
"mp4": "libx264",
"webm": "libvpx-vp9",
"mov": "libx264",
"avi": "libx264",
}
codec = codec_map.get(output_format.lower(), "libx264")
clip.write_videofile(
output_path, codec=codec, audio_codec="aac", preset=settings["preset"]
)
original_size = os.path.getsize(input_path)
converted_size = os.path.getsize(output_path)
compression_ratio = (
converted_size / original_size if original_size > 0 else 0
)
return {
"output_path": output_path,
"output_format": output_format,
"quality_preset": quality_preset,
"original_size_bytes": original_size,
"converted_size_bytes": converted_size,
"compression_ratio": compression_ratio,
"duration": clip.duration,
"status": "success",
}
except Exception as e:
return {"error": f"Format conversion failed: {e!s}"}
@mcp.tool()
def mcp_audio_mix_tracks(
audio_files: list[str],
output_path: str,
volume_levels: list[float] | None = None,
sync_timing: list[float] | None = None,
) -> dict[str, str | float | int]:
"""
Combine multiple audio tracks with volume control and timing.
Args:
audio_files: list of audio file paths to mix
output_path: Output audio file path
volume_levels: Volume multipliers for each track (1.0 = original volume)
sync_timing: Start times for each track in seconds
Returns:
dict with output path and mixing details
"""
try:
if not audio_files:
return {"error": "No audio files provided"}
audio_clips = []
total_duration = 0.0
for i, audio_path in enumerate(audio_files):
if not os.path.exists(audio_path):
return {"error": f"Audio file not found: {audio_path}"}
clip = AudioFileClip(audio_path)
# Apply volume adjustment if specified
if volume_levels and i < len(volume_levels):
clip = clip.fx(volumex, volume_levels[i])
# Apply timing offset if specified
if sync_timing and i < len(sync_timing):
clip = clip.set_start(sync_timing[i])
audio_clips.append(clip)
clip_end_time = (
sync_timing[i] if sync_timing and i < len(sync_timing) else 0
) + clip.duration
total_duration = max(total_duration, clip_end_time)
# Composite all audio clips
from moviepy.audio.AudioClip import CompositeAudioClip
final_audio = CompositeAudioClip(audio_clips)
final_audio.write_audiofile(output_path)
# Clean up
for clip in audio_clips:
clip.close()
final_audio.close()
return {
"output_path": output_path,
"total_duration": total_duration,
"tracks_count": len(audio_files),
"volume_levels": volume_levels or [1.0] * len(audio_files),
"status": "success",
}
except Exception as e:
return {"error": f"Audio mixing failed: {e!s}"}
@mcp.tool()
def mcp_audio_sync_video(
video_path: str,
audio_path: str,
output_path: str,
audio_start_time: float = 0.0,
replace_audio: bool = True,
) -> dict[str, str | float]:
"""
Synchronize audio track with video timeline.
Args:
video_path: Input video file path
audio_path: Audio file to sync with video
output_path: Output video file path
audio_start_time: When audio should start in video timeline
replace_audio: Whether to replace existing audio or mix with it
Returns:
dict with output path and sync details
"""
try:
if not os.path.exists(video_path):
return {"error": f"Video file not found: {video_path}"}
if not os.path.exists(audio_path):
return {"error": f"Audio file not found: {audio_path}"}
video_clip = VideoFileClip(video_path)
audio_clip = AudioFileClip(audio_path).set_start(audio_start_time)
if replace_audio:
# Replace original audio with new audio
final_clip = video_clip.set_audio(audio_clip)
else:
# Mix new audio with existing audio
if video_clip.audio:
from moviepy.audio.AudioClip import CompositeAudioClip
mixed_audio = CompositeAudioClip([video_clip.audio, audio_clip])
final_clip = video_clip.set_audio(mixed_audio)
else:
final_clip = video_clip.set_audio(audio_clip)
final_clip.write_videofile(output_path, audio_codec="aac")
# Clean up
video_clip.close()
audio_clip.close()
final_clip.close()
return {
"output_path": output_path,
"video_duration": video_clip.duration,
"audio_start_time": audio_start_time,
"audio_duration": audio_clip.duration,
"replace_audio": replace_audio,
"status": "success",
}
except Exception as e:
return {"error": f"Audio sync failed: {e!s}"}
@mcp.tool()
def mcp_video_add_branding(
input_path: str,
output_path: str,
logo_path: str | None = None,
brand_colors: dict | None = None,
position: str = "bottom-right",
opacity: float = 0.8,
size_scale: float = 0.1,
) -> dict[str, str | dict]:
"""
Apply consistent branding elements (logos, colors) to video.
Args:
input_path: Input video file path
output_path: Output video file path
logo_path: Path to logo image file
brand_colors: Brand color scheme dict
position: Logo position ("bottom-right", "top-left", "center", etc.)
opacity: Logo opacity (0.0 to 1.0)
size_scale: Logo size relative to video dimensions
Returns:
dict with output path and branding details
"""
try:
if not os.path.exists(input_path):
return {"error": f"Input video file not found: {input_path}"}
video_clip = VideoFileClip(input_path)
if logo_path and os.path.exists(logo_path):
# Add logo overlay
from moviepy.editor import ImageClip
logo_clip = ImageClip(logo_path, transparent=True)
# Scale logo based on video dimensions
video_width, video_height = video_clip.size
logo_width = int(video_width * size_scale)
logo_clip = logo_clip.resize(width=logo_width)
# Position logo
position_map = {
"top-left": ("left", "top"),
"top-right": ("right", "top"),
"bottom-left": ("left", "bottom"),
"bottom-right": ("right", "bottom"),
"center": ("center", "center"),
}
pos = position_map.get(position, ("right", "bottom"))
logo_clip = (
logo_clip.set_position(pos)
.set_duration(video_clip.duration)
.set_opacity(opacity)
)
# Composite video with logo
final_clip = CompositeVideoClip([video_clip, logo_clip])
else:
final_clip = video_clip
final_clip.write_videofile(output_path, audio_codec="aac")
# Clean up
video_clip.close()
if logo_path:
logo_clip.close()
final_clip.close()
return {
"output_path": output_path,
"logo_path": logo_path,
"position": position,
"opacity": opacity,
"size_scale": size_scale,
"brand_colors": brand_colors or {},
"status": "success",
}
except Exception as e:
return {"error": f"Branding application failed: {e!s}"}
@mcp.tool()
def mcp_video_resolution_optimizer(
input_path: str,
output_directory: str,
target_resolutions: list[str] = None,
quality_settings: dict | None = None,
) -> dict[str, str | list | dict]:
"""
Generate multiple resolutions from source video.
Args:
input_path: Input video file path
output_directory: Directory to save optimized versions
target_resolutions: list of target resolutions (e.g., ["1080p", "720p", "480p"])
quality_settings: Quality settings for each resolution
Returns:
dict with generated file paths and optimization details
"""
try:
if not os.path.exists(input_path):
return {"error": f"Input video file not found: {input_path}"}
os.makedirs(output_directory, exist_ok=True)
if target_resolutions is None:
target_resolutions = ["1080p", "720p", "480p"]
resolution_map = {
"1080p": (1920, 1080),
"720p": (1280, 720),
"480p": (854, 480),
"360p": (640, 360),
}
video_clip = VideoFileClip(input_path)
original_size = video_clip.size
base_filename = Path(input_path).stem
generated_files = []
for res in target_resolutions:
if res not in resolution_map:
continue
target_width, target_height = resolution_map[res]
# Skip if target resolution is larger than original
if target_width > original_size[0] or target_height > original_size[1]:
continue
# Resize video
resized_clip = video_clip.resize((target_width, target_height))
# Generate output filename
output_filename = f"{base_filename}_{res}.mp4"
output_path = os.path.join(output_directory, output_filename)
# Write resized video
resized_clip.write_videofile(output_path, audio_codec="aac")
generated_files.append(
{
"resolution": res,
"dimensions": f"{target_width}x{target_height}",
"output_path": output_path,
"file_size": os.path.getsize(output_path),
}
)
resized_clip.close()
video_clip.close()
return {
"input_path": input_path,
"output_directory": output_directory,
"original_resolution": f"{original_size[0]}x{original_size[1]}",
"generated_files": generated_files,
"total_files": len(generated_files),
"status": "success",
}
except Exception as e:
return {"error": f"Resolution optimization failed: {e!s}"}
def main():
"""Main entry point for the MCP Video Editor server."""
mcp.run()
if __name__ == "__main__":
main()