Ryan Malloy 406cc891c3 Add font resources and fix validation issues
- Add font:// MCP resources for available and web-safe fonts
- Create mcp_video_test_fonts tool for font testing
- Fix return value validation (convert numbers to strings)
- Add Union import for type hints
- Clients can now discover available fonts before using overlays
2025-09-05 05:37:12 -06:00

935 lines
29 KiB
Python

"""MCP Video Editor Server - Professional video production tools."""
import os
from pathlib import Path
from typing import Union
from fastmcp import FastMCP
# Optional imports for video processing
try:
from moviepy import * # Import all MoviePy functionality
MOVIEPY_AVAILABLE = True
except ImportError as e:
print(f"MoviePy import error: {e}")
MOVIEPY_AVAILABLE = False
try:
import cv2
import numpy as np
CV2_AVAILABLE = True
except ImportError:
CV2_AVAILABLE = False
try:
import ffmpeg
FFMPEG_AVAILABLE = True
except ImportError:
FFMPEG_AVAILABLE = False
# Initialize FastMCP server
mcp = FastMCP("MCP Video Editor")
def get_available_fonts():
"""Get list of available system fonts for text overlays."""
fonts = []
# Try to get system fonts using different methods
try:
# Method 1: Try matplotlib font manager
import matplotlib.font_manager as fm
font_paths = fm.findSystemFonts()
font_names = []
for font_path in font_paths[:50]: # Limit to first 50 fonts
try:
prop = fm.FontProperties(fname=font_path)
name = prop.get_name()
if name and name not in font_names:
font_names.append(name)
fonts.append({
"name": name,
"path": font_path,
"family": prop.get_family()[0] if prop.get_family() else "Unknown"
})
except:
continue
except ImportError:
pass
# Method 2: Add common web-safe fonts
common_fonts = [
{"name": "Arial", "family": "sans-serif", "web_safe": True},
{"name": "Helvetica", "family": "sans-serif", "web_safe": True},
{"name": "Times New Roman", "family": "serif", "web_safe": True},
{"name": "Courier New", "family": "monospace", "web_safe": True},
{"name": "Verdana", "family": "sans-serif", "web_safe": True},
{"name": "Georgia", "family": "serif", "web_safe": True},
{"name": "Comic Sans MS", "family": "cursive", "web_safe": True},
{"name": "Impact", "family": "fantasy", "web_safe": True},
]
# Add common fonts if not already found
existing_names = [f["name"] for f in fonts]
for common_font in common_fonts:
if common_font["name"] not in existing_names:
fonts.append(common_font)
return fonts[:30] # Return max 30 fonts
# Add font resources to MCP server
@mcp.resource("font://available")
def get_available_fonts_resource():
"""List all available fonts for text overlays."""
fonts = get_available_fonts()
return {
"description": "Available fonts for video text overlays",
"total_fonts": len(fonts),
"fonts": fonts
}
@mcp.resource("font://web-safe")
def get_websafe_fonts_resource():
"""List web-safe fonts guaranteed to work."""
web_safe_fonts = [
{"name": "Arial", "family": "sans-serif", "description": "Clean, modern sans-serif"},
{"name": "Helvetica", "family": "sans-serif", "description": "Professional sans-serif"},
{"name": "Times New Roman", "family": "serif", "description": "Classic serif font"},
{"name": "Courier New", "family": "monospace", "description": "Fixed-width font"},
{"name": "Verdana", "family": "sans-serif", "description": "Web-optimized sans-serif"},
{"name": "Georgia", "family": "serif", "description": "Web-optimized serif"},
]
return {
"description": "Web-safe fonts guaranteed to work across platforms",
"fonts": web_safe_fonts
}
def _check_moviepy_dependency():
"""Check if MoviePy is available for video processing."""
if not MOVIEPY_AVAILABLE:
return {"error": "MoviePy is not installed. Install with: pip install moviepy"}
return None
class VideoRecordingSession:
"""Manages video recording sessions."""
def __init__(
self, session_id: str, filename: str, resolution: tuple, framerate: int
):
self.session_id = session_id
self.filename = filename
self.resolution = resolution
self.framerate = framerate
self.is_recording = False
self.output_path = None
def start(self, output_dir: str = "./temp_videos"):
"""Start recording session."""
os.makedirs(output_dir, exist_ok=True)
self.output_path = os.path.join(output_dir, self.filename)
self.is_recording = True
return self.output_path
# Global recording sessions store
recording_sessions: dict[str, VideoRecordingSession] = {}
@mcp.tool()
def mcp_video_editor_status() -> dict[str, str | bool | int]:
"""
Get the status of the MCP Video Editor server and available capabilities.
Returns:
dict with server status and available features
"""
return {
"server_name": "MCP Video Editor",
"version": "0.1.0",
"status": "running",
"moviepy_available": MOVIEPY_AVAILABLE,
"opencv_available": CV2_AVAILABLE,
"ffmpeg_available": FFMPEG_AVAILABLE,
"active_sessions": len(recording_sessions),
}
@mcp.tool()
def mcp_video_recorder_start(
filename: str,
resolution: str = "1920x1080",
framerate: int = 30,
region: str | None = None,
) -> dict[str, str | int]:
"""
Start reliable video capture with persistent recording sessions.
Args:
filename: Video output filename (e.g., "demo.mp4")
resolution: Recording resolution (e.g., "1920x1080", "1280x720")
framerate: Recording framerate (e.g., 30, 60)
region: Optional screen region coordinates as "x,y,width,height"
Returns:
dict with session_id for tracking and recording details
"""
import uuid
session_id = str(uuid.uuid4())
width, height = map(int, resolution.split("x"))
session = VideoRecordingSession(session_id, filename, (width, height), framerate)
output_path = session.start()
recording_sessions[session_id] = session
return {
"session_id": session_id,
"filename": filename,
"resolution": resolution,
"framerate": framerate,
"output_path": output_path,
"status": "recording_started",
}
@mcp.tool()
def mcp_video_recorder_stop(session_id: str) -> dict[str, str | int | float]:
"""
Stop recording and ensure file is saved.
Args:
session_id: Recording session ID from start command
Returns:
dict with file path and recording statistics
"""
if session_id not in recording_sessions:
return {"error": f"Session {session_id} not found"}
session = recording_sessions[session_id]
session.is_recording = False
# Simulate recording completion and file size calculation
file_size = 0
duration = 0.0
if session.output_path and os.path.exists(session.output_path):
file_size = os.path.getsize(session.output_path)
try:
with VideoFileClip(session.output_path) as clip:
duration = clip.duration
except:
duration = 0.0
result = {
"session_id": session_id,
"output_path": session.output_path,
"file_size_bytes": file_size,
"duration_seconds": duration,
"status": "recording_stopped",
}
# Clean up session
del recording_sessions[session_id]
return result
@mcp.tool()
def mcp_video_concatenate(
input_clips: list[str], output_path: str, transition_type: str = "cut"
) -> dict[str, str | float]:
"""
Join multiple video clips into single file.
Args:
input_clips: Array of video file paths to concatenate
output_path: Combined video output location
transition_type: Type of transition ("cut", "fade", "dissolve")
Returns:
dict with output path and total duration
"""
try:
clips = []
total_duration = 0.0
for clip_path in input_clips:
if not os.path.exists(clip_path):
return {"error": f"Input file not found: {clip_path}"}
clip = VideoFileClip(clip_path)
clips.append(clip)
total_duration += clip.duration
if not clips:
return {"error": "No valid input clips provided"}
# Apply transitions if specified
if transition_type == "fade":
# Add crossfade transitions between clips
for i in range(1, len(clips)):
clips[i] = clips[i].crossfadein(1.0)
# Concatenate clips
final_clip = concatenate_videoclips(clips, method="compose")
final_clip.write_videofile(output_path, audio_codec="aac")
# Clean up
for clip in clips:
clip.close()
final_clip.close()
return {
"output_path": output_path,
"total_duration": total_duration,
"clips_count": len(input_clips),
"status": "success",
}
except Exception as e:
return {"error": f"Concatenation failed: {e!s}"}
@mcp.tool()
def mcp_video_trim(
input_path: str, start_time: float, end_time: float, output_path: str
) -> dict[str, str | float]:
"""
Cut video segments to specific timeframes.
Args:
input_path: Input video file path
start_time: Start time in seconds
end_time: End time in seconds
output_path: Output video file path
Returns:
dict with output path and trimmed duration
"""
try:
if not os.path.exists(input_path):
return {"error": f"Input file not found: {input_path}"}
with VideoFileClip(input_path) as clip:
if (
start_time >= clip.duration
or end_time > clip.duration
or start_time >= end_time
):
return {"error": "Invalid time range specified"}
trimmed_clip = clip.subclip(start_time, end_time)
trimmed_clip.write_videofile(output_path, audio_codec="aac")
duration = trimmed_clip.duration
trimmed_clip.close()
return {
"output_path": output_path,
"original_duration": clip.duration,
"trimmed_duration": duration,
"start_time": start_time,
"end_time": end_time,
"status": "success",
}
except Exception as e:
return {"error": f"Trimming failed: {e!s}"}
@mcp.tool()
def mcp_video_speed_control(
input_path: str,
speed_multiplier: float,
output_path: str,
start_time: float | None = None,
end_time: float | None = None,
) -> dict[str, str | float]:
"""
Adjust playback speed for specific segments.
Args:
input_path: Input video file path
speed_multiplier: Speed multiplier (0.5 = half speed, 2.0 = double speed)
output_path: Output video file path
start_time: Optional start time for speed change (entire video if not specified)
end_time: Optional end time for speed change
Returns:
dict with output path and new duration
"""
try:
if not os.path.exists(input_path):
return {"error": f"Input file not found: {input_path}"}
with VideoFileClip(input_path) as clip:
if start_time is not None and end_time is not None:
# Apply speed change to specific segment
before_clip = clip.subclip(0, start_time) if start_time > 0 else None
speed_clip = clip.subclip(start_time, end_time).fx(
speedx, speed_multiplier
)
after_clip = (
clip.subclip(end_time) if end_time < clip.duration else None
)
clips_to_concat = [
c for c in [before_clip, speed_clip, after_clip] if c is not None
]
final_clip = concatenate_videoclips(clips_to_concat)
else:
# Apply speed change to entire video
final_clip = clip.fx(speedx, speed_multiplier)
final_clip.write_videofile(output_path, audio_codec="aac")
new_duration = final_clip.duration
final_clip.close()
return {
"output_path": output_path,
"original_duration": clip.duration,
"new_duration": new_duration,
"speed_multiplier": speed_multiplier,
"status": "success",
}
except Exception as e:
return {"error": f"Speed control failed: {e!s}"}
@mcp.tool()
def mcp_video_add_overlay(
input_path: str,
output_path: str,
overlay_type: str,
text: str | None = None,
position: str = "center",
duration: float | None = None,
start_time: float = 0,
style: dict | None = None,
) -> dict[str, str]:
"""
Add graphics, text, shapes over video content.
Args:
input_path: Input video file path
output_path: Output video file path
overlay_type: Type of overlay ("text", "image", "shape", "arrow")
text: Text content (for text overlays)
position: Position on screen ("center", "top-left", "bottom-right", etc.)
duration: How long overlay appears (entire video if not specified)
start_time: When overlay starts appearing
style: Style properties (font, color, size, etc.)
Returns:
dict with output path and overlay details
"""
try:
if not os.path.exists(input_path):
return {"error": f"Input file not found: {input_path}"}
with VideoFileClip(input_path) as clip:
if overlay_type == "text" and text:
# Default style (corrected parameter names)
default_style = {"font_size": 50, "color": "white"}
if style:
default_style.update(style)
# Create text clip with correct syntax
txt_clip = TextClip(
text=text,
font_size=default_style["font_size"],
color=default_style["color"],
duration=duration or (clip.duration - start_time)
).with_position(position).with_start(start_time)
# Duration is already set in TextClip constructor
# Composite video with text overlay
final_clip = CompositeVideoClip([clip, txt_clip])
final_clip.write_videofile(output_path, audio_codec="aac")
final_clip.close()
else:
return {"error": f"Overlay type '{overlay_type}' not implemented yet"}
return {
"output_path": output_path,
"overlay_type": overlay_type,
"position": position,
"start_time": str(start_time),
"duration": str(duration or (clip.duration - start_time)),
"status": "success",
}
except Exception as e:
return {"error": f"Overlay addition failed: {e!s}"}
@mcp.tool()
def mcp_video_format_convert(
input_path: str,
output_path: str,
output_format: str = "mp4",
quality_preset: str = "balanced",
compression_level: str = "medium",
) -> dict[str, str | int | float]:
"""
Export to different video formats and qualities.
Args:
input_path: Input video file path
output_path: Output video file path
output_format: Target format ("mp4", "webm", "mov", "avi")
quality_preset: Quality preset ("web-optimized", "high-quality", "mobile", "balanced")
compression_level: Compression level ("low", "medium", "high")
Returns:
dict with conversion results and file info
"""
try:
if not os.path.exists(input_path):
return {"error": f"Input file not found: {input_path}"}
# Quality settings based on preset
quality_settings = {
"web-optimized": {"crf": 28, "preset": "medium"},
"high-quality": {"crf": 18, "preset": "slow"},
"mobile": {"crf": 32, "preset": "fast"},
"balanced": {"crf": 23, "preset": "medium"},
}
settings = quality_settings.get(quality_preset, quality_settings["balanced"])
with VideoFileClip(input_path) as clip:
# Write with specified format and quality
codec_map = {
"mp4": "libx264",
"webm": "libvpx-vp9",
"mov": "libx264",
"avi": "libx264",
}
codec = codec_map.get(output_format.lower(), "libx264")
clip.write_videofile(
output_path, codec=codec, audio_codec="aac", preset=settings["preset"]
)
original_size = os.path.getsize(input_path)
converted_size = os.path.getsize(output_path)
compression_ratio = (
converted_size / original_size if original_size > 0 else 0
)
return {
"output_path": output_path,
"output_format": output_format,
"quality_preset": quality_preset,
"original_size_bytes": original_size,
"converted_size_bytes": converted_size,
"compression_ratio": compression_ratio,
"duration": clip.duration,
"status": "success",
}
except Exception as e:
return {"error": f"Format conversion failed: {e!s}"}
@mcp.tool()
def mcp_audio_mix_tracks(
audio_files: list[str],
output_path: str,
volume_levels: list[float] | None = None,
sync_timing: list[float] | None = None,
) -> dict[str, str | float | int]:
"""
Combine multiple audio tracks with volume control and timing.
Args:
audio_files: list of audio file paths to mix
output_path: Output audio file path
volume_levels: Volume multipliers for each track (1.0 = original volume)
sync_timing: Start times for each track in seconds
Returns:
dict with output path and mixing details
"""
try:
if not audio_files:
return {"error": "No audio files provided"}
audio_clips = []
total_duration = 0.0
for i, audio_path in enumerate(audio_files):
if not os.path.exists(audio_path):
return {"error": f"Audio file not found: {audio_path}"}
clip = AudioFileClip(audio_path)
# Apply volume adjustment if specified
if volume_levels and i < len(volume_levels):
clip = clip.fx(volumex, volume_levels[i])
# Apply timing offset if specified
if sync_timing and i < len(sync_timing):
clip = clip.set_start(sync_timing[i])
audio_clips.append(clip)
clip_end_time = (
sync_timing[i] if sync_timing and i < len(sync_timing) else 0
) + clip.duration
total_duration = max(total_duration, clip_end_time)
# Composite all audio clips
from moviepy.audio.AudioClip import CompositeAudioClip
final_audio = CompositeAudioClip(audio_clips)
final_audio.write_audiofile(output_path)
# Clean up
for clip in audio_clips:
clip.close()
final_audio.close()
return {
"output_path": output_path,
"total_duration": total_duration,
"tracks_count": len(audio_files),
"volume_levels": volume_levels or [1.0] * len(audio_files),
"status": "success",
}
except Exception as e:
return {"error": f"Audio mixing failed: {e!s}"}
@mcp.tool()
def mcp_audio_sync_video(
video_path: str,
audio_path: str,
output_path: str,
audio_start_time: float = 0.0,
replace_audio: bool = True,
) -> dict[str, str | float]:
"""
Synchronize audio track with video timeline.
Args:
video_path: Input video file path
audio_path: Audio file to sync with video
output_path: Output video file path
audio_start_time: When audio should start in video timeline
replace_audio: Whether to replace existing audio or mix with it
Returns:
dict with output path and sync details
"""
try:
if not os.path.exists(video_path):
return {"error": f"Video file not found: {video_path}"}
if not os.path.exists(audio_path):
return {"error": f"Audio file not found: {audio_path}"}
video_clip = VideoFileClip(video_path)
audio_clip = AudioFileClip(audio_path).set_start(audio_start_time)
if replace_audio:
# Replace original audio with new audio
final_clip = video_clip.set_audio(audio_clip)
else:
# Mix new audio with existing audio
if video_clip.audio:
from moviepy.audio.AudioClip import CompositeAudioClip
mixed_audio = CompositeAudioClip([video_clip.audio, audio_clip])
final_clip = video_clip.set_audio(mixed_audio)
else:
final_clip = video_clip.set_audio(audio_clip)
final_clip.write_videofile(output_path, audio_codec="aac")
# Clean up
video_clip.close()
audio_clip.close()
final_clip.close()
return {
"output_path": output_path,
"video_duration": video_clip.duration,
"audio_start_time": audio_start_time,
"audio_duration": audio_clip.duration,
"replace_audio": replace_audio,
"status": "success",
}
except Exception as e:
return {"error": f"Audio sync failed: {e!s}"}
@mcp.tool()
def mcp_video_add_branding(
input_path: str,
output_path: str,
logo_path: str | None = None,
brand_colors: dict | None = None,
position: str = "bottom-right",
opacity: float = 0.8,
size_scale: float = 0.1,
) -> dict[str, str | dict]:
"""
Apply consistent branding elements (logos, colors) to video.
Args:
input_path: Input video file path
output_path: Output video file path
logo_path: Path to logo image file
brand_colors: Brand color scheme dict
position: Logo position ("bottom-right", "top-left", "center", etc.)
opacity: Logo opacity (0.0 to 1.0)
size_scale: Logo size relative to video dimensions
Returns:
dict with output path and branding details
"""
try:
if not os.path.exists(input_path):
return {"error": f"Input video file not found: {input_path}"}
video_clip = VideoFileClip(input_path)
if logo_path and os.path.exists(logo_path):
# Add logo overlay
from moviepy.editor import ImageClip
logo_clip = ImageClip(logo_path, transparent=True)
# Scale logo based on video dimensions
video_width, video_height = video_clip.size
logo_width = int(video_width * size_scale)
logo_clip = logo_clip.resize(width=logo_width)
# Position logo
position_map = {
"top-left": ("left", "top"),
"top-right": ("right", "top"),
"bottom-left": ("left", "bottom"),
"bottom-right": ("right", "bottom"),
"center": ("center", "center"),
}
pos = position_map.get(position, ("right", "bottom"))
logo_clip = (
logo_clip.set_position(pos)
.set_duration(video_clip.duration)
.set_opacity(opacity)
)
# Composite video with logo
final_clip = CompositeVideoClip([video_clip, logo_clip])
else:
final_clip = video_clip
final_clip.write_videofile(output_path, audio_codec="aac")
# Clean up
video_clip.close()
if logo_path:
logo_clip.close()
final_clip.close()
return {
"output_path": output_path,
"logo_path": logo_path,
"position": position,
"opacity": opacity,
"size_scale": size_scale,
"brand_colors": brand_colors or {},
"status": "success",
}
except Exception as e:
return {"error": f"Branding application failed: {e!s}"}
@mcp.tool()
def mcp_video_resolution_optimizer(
input_path: str,
output_directory: str,
target_resolutions: list[str] = None,
quality_settings: dict | None = None,
) -> dict[str, str | list | dict]:
"""
Generate multiple resolutions from source video.
Args:
input_path: Input video file path
output_directory: Directory to save optimized versions
target_resolutions: list of target resolutions (e.g., ["1080p", "720p", "480p"])
quality_settings: Quality settings for each resolution
Returns:
dict with generated file paths and optimization details
"""
try:
if not os.path.exists(input_path):
return {"error": f"Input video file not found: {input_path}"}
os.makedirs(output_directory, exist_ok=True)
if target_resolutions is None:
target_resolutions = ["1080p", "720p", "480p"]
resolution_map = {
"1080p": (1920, 1080),
"720p": (1280, 720),
"480p": (854, 480),
"360p": (640, 360),
}
video_clip = VideoFileClip(input_path)
original_size = video_clip.size
base_filename = Path(input_path).stem
generated_files = []
for res in target_resolutions:
if res not in resolution_map:
continue
target_width, target_height = resolution_map[res]
# Skip if target resolution is larger than original
if target_width > original_size[0] or target_height > original_size[1]:
continue
# Resize video
resized_clip = video_clip.resize((target_width, target_height))
# Generate output filename
output_filename = f"{base_filename}_{res}.mp4"
output_path = os.path.join(output_directory, output_filename)
# Write resized video
resized_clip.write_videofile(output_path, audio_codec="aac")
generated_files.append(
{
"resolution": res,
"dimensions": f"{target_width}x{target_height}",
"output_path": output_path,
"file_size": os.path.getsize(output_path),
}
)
resized_clip.close()
video_clip.close()
return {
"input_path": input_path,
"output_directory": output_directory,
"original_resolution": f"{original_size[0]}x{original_size[1]}",
"generated_files": generated_files,
"total_files": len(generated_files),
"status": "success",
}
except Exception as e:
return {"error": f"Resolution optimization failed: {e!s}"}
@mcp.tool()
def mcp_video_test_fonts(
output_path: str,
test_text: str = "InterNACHI Expert Agent",
font_names: list[str] = None
) -> dict[str, Union[str, int, list]]:
"""
Create a video showcasing different fonts for testing.
Args:
output_path: Output video file path
test_text: Text to display with each font
font_names: List of font names to test (uses web-safe fonts if not provided)
Returns:
Dict with output path and font test results
"""
try:
if not MOVIEPY_AVAILABLE:
return {"error": "MoviePy is not available for video processing"}
# Use web-safe fonts if none provided
if not font_names:
font_names = ["Arial", "Times New Roman", "Courier New", "Verdana"]
# Create font test clips
test_clips = []
working_fonts = []
failed_fonts = []
for i, font_name in enumerate(font_names):
try:
# Try to create text clip with this font
text_clip = TextClip(
text=f"{test_text} - {font_name}",
font_size=48,
color='white',
duration=2
).with_position('center')
# Create background for contrast
bg_clip = ColorClip(size=(1280, 720), color=(50, 50, 50)).with_duration(2)
# Composite text on background
composite = CompositeVideoClip([bg_clip, text_clip])
test_clips.append(composite)
working_fonts.append(font_name)
except Exception as e:
failed_fonts.append({"font": font_name, "error": str(e)})
continue
if not test_clips:
return {"error": "No fonts worked for testing"}
# Concatenate all test clips
final_video = concatenate_videoclips(test_clips)
final_video.write_videofile(output_path, fps=24)
# Cleanup
for clip in test_clips:
clip.close()
final_video.close()
return {
"output_path": output_path,
"total_fonts_tested": len(font_names),
"working_fonts": working_fonts,
"failed_fonts": failed_fonts,
"working_count": len(working_fonts),
"failed_count": len(failed_fonts),
"video_duration": len(working_fonts) * 2,
"status": "success"
}
except Exception as e:
return {"error": f"Font testing failed: {str(e)}"}
def main():
"""Main entry point for the MCP Video Editor server."""
mcp.run()
if __name__ == "__main__":
main()