- Use app.run_stdio_async() instead of deprecated stdio_server import - Aligns with FastMCP 2.11.3 API - Server now starts correctly with uv run mcp-office-tools - Maintains all MCPMixin functionality and tool registration
343 lines
14 KiB
Python
343 lines
14 KiB
Python
"""Universal Office Tools Mixin - Format-agnostic tools that work across all Office document types."""
|
|
|
|
import time
|
|
from typing import Any
|
|
|
|
from fastmcp.contrib.mcp_mixin import MCPMixin, mcp_tool
|
|
from pydantic import Field
|
|
|
|
from ..utils import (
|
|
OfficeFileError,
|
|
classify_document_type,
|
|
detect_format,
|
|
get_supported_extensions,
|
|
resolve_office_file_path,
|
|
validate_office_file,
|
|
)
|
|
|
|
|
|
class UniversalMixin(MCPMixin):
|
|
"""Mixin containing format-agnostic tools that work across Word, Excel, PowerPoint, and CSV files."""
|
|
|
|
@mcp_tool(
|
|
name="extract_text",
|
|
description="Extract text content from Office documents with intelligent method selection. Supports Word (.docx, .doc), Excel (.xlsx, .xls), PowerPoint (.pptx, .ppt), and CSV files. Uses multi-library fallback for maximum compatibility."
|
|
)
|
|
async def extract_text(
|
|
self,
|
|
file_path: str = Field(description="Path to Office document or URL"),
|
|
preserve_formatting: bool = Field(default=False, description="Preserve text formatting and structure"),
|
|
include_metadata: bool = Field(default=True, description="Include document metadata in output"),
|
|
method: str = Field(default="auto", description="Extraction method: auto, primary, fallback")
|
|
) -> dict[str, Any]:
|
|
start_time = time.time()
|
|
|
|
try:
|
|
# Resolve file path (download if URL)
|
|
local_path = await resolve_office_file_path(file_path)
|
|
|
|
# Validate file
|
|
validation = await validate_office_file(local_path)
|
|
if not validation["is_valid"]:
|
|
raise OfficeFileError(f"Invalid file: {', '.join(validation['errors'])}")
|
|
|
|
# Get format info
|
|
format_info = await detect_format(local_path)
|
|
category = format_info["category"]
|
|
extension = format_info["extension"]
|
|
|
|
# Extract text based on category with fallback
|
|
text_result = await self._extract_text_by_category(local_path, extension, category, preserve_formatting, method)
|
|
|
|
# Build response
|
|
result = {
|
|
"text": text_result["text"],
|
|
"metadata": {
|
|
"original_file": file_path,
|
|
"format": format_info["format_name"],
|
|
"extraction_method": text_result["method_used"],
|
|
"extraction_time": round(time.time() - start_time, 3),
|
|
"methods_tried": text_result.get("methods_tried", [text_result["method_used"]])
|
|
}
|
|
}
|
|
|
|
# Add formatted sections if preserved
|
|
if preserve_formatting and "formatted_sections" in text_result:
|
|
result["structure"] = text_result["formatted_sections"]
|
|
|
|
# Add metadata if requested
|
|
if include_metadata:
|
|
doc_metadata = await self._extract_basic_metadata(local_path, extension, category)
|
|
result["document_metadata"] = doc_metadata
|
|
|
|
return result
|
|
|
|
except OfficeFileError:
|
|
raise
|
|
except Exception as e:
|
|
raise OfficeFileError(f"Text extraction failed: {str(e)}")
|
|
|
|
@mcp_tool(
|
|
name="extract_images",
|
|
description="Extract images from Office documents with size filtering and format conversion."
|
|
)
|
|
async def extract_images(
|
|
self,
|
|
file_path: str = Field(description="Path to Office document or URL"),
|
|
min_width: int = Field(default=100, description="Minimum image width in pixels"),
|
|
min_height: int = Field(default=100, description="Minimum image height in pixels"),
|
|
output_format: str = Field(default="png", description="Output image format: png, jpg, jpeg"),
|
|
include_metadata: bool = Field(default=True, description="Include image metadata")
|
|
) -> dict[str, Any]:
|
|
start_time = time.time()
|
|
|
|
try:
|
|
# Resolve file path
|
|
local_path = await resolve_office_file_path(file_path)
|
|
|
|
# Validate file
|
|
validation = await validate_office_file(local_path)
|
|
if not validation["is_valid"]:
|
|
raise OfficeFileError(f"Invalid file: {', '.join(validation['errors'])}")
|
|
|
|
# Get format info
|
|
format_info = await detect_format(local_path)
|
|
category = format_info["category"]
|
|
extension = format_info["extension"]
|
|
|
|
# Extract images based on category
|
|
images = await self._extract_images_by_category(local_path, extension, category, output_format, min_width, min_height)
|
|
|
|
return {
|
|
"images": images,
|
|
"metadata": {
|
|
"original_file": file_path,
|
|
"format": format_info["format_name"],
|
|
"image_count": len(images),
|
|
"extraction_time": round(time.time() - start_time, 3),
|
|
"filters_applied": {
|
|
"min_width": min_width,
|
|
"min_height": min_height,
|
|
"output_format": output_format
|
|
}
|
|
}
|
|
}
|
|
|
|
except OfficeFileError:
|
|
raise
|
|
except Exception as e:
|
|
raise OfficeFileError(f"Image extraction failed: {str(e)}")
|
|
|
|
@mcp_tool(
|
|
name="extract_metadata",
|
|
description="Extract comprehensive metadata from Office documents."
|
|
)
|
|
async def extract_metadata(
|
|
self,
|
|
file_path: str = Field(description="Path to Office document or URL")
|
|
) -> dict[str, Any]:
|
|
start_time = time.time()
|
|
|
|
try:
|
|
# Resolve file path
|
|
local_path = await resolve_office_file_path(file_path)
|
|
|
|
# Validate file
|
|
validation = await validate_office_file(local_path)
|
|
if not validation["is_valid"]:
|
|
raise OfficeFileError(f"Invalid file: {', '.join(validation['errors'])}")
|
|
|
|
# Get format info
|
|
format_info = await detect_format(local_path)
|
|
category = format_info["category"]
|
|
extension = format_info["extension"]
|
|
|
|
# Extract metadata based on category
|
|
metadata = await self._extract_metadata_by_category(local_path, extension, category)
|
|
|
|
# Add extraction info
|
|
metadata["extraction_info"] = {
|
|
"extraction_time": round(time.time() - start_time, 3),
|
|
"format_detected": format_info["format_name"]
|
|
}
|
|
|
|
return metadata
|
|
|
|
except OfficeFileError:
|
|
raise
|
|
except Exception as e:
|
|
raise OfficeFileError(f"Metadata extraction failed: {str(e)}")
|
|
|
|
@mcp_tool(
|
|
name="detect_office_format",
|
|
description="Intelligent Office document format detection and analysis."
|
|
)
|
|
async def detect_office_format(
|
|
self,
|
|
file_path: str = Field(description="Path to Office document or URL")
|
|
) -> dict[str, Any]:
|
|
try:
|
|
# Resolve file path
|
|
local_path = await resolve_office_file_path(file_path)
|
|
|
|
# Get comprehensive format detection
|
|
format_info = await detect_format(local_path)
|
|
|
|
# Add classification
|
|
classification = await classify_document_type(local_path)
|
|
format_info.update(classification)
|
|
|
|
return format_info
|
|
|
|
except Exception as e:
|
|
raise OfficeFileError(f"Format detection failed: {str(e)}")
|
|
|
|
@mcp_tool(
|
|
name="analyze_document_health",
|
|
description="Comprehensive document health and integrity analysis."
|
|
)
|
|
async def analyze_document_health(
|
|
self,
|
|
file_path: str = Field(description="Path to Office document or URL")
|
|
) -> dict[str, Any]:
|
|
start_time = time.time()
|
|
|
|
try:
|
|
# Resolve file path
|
|
local_path = await resolve_office_file_path(file_path)
|
|
|
|
# Validate file thoroughly
|
|
validation = await validate_office_file(local_path)
|
|
|
|
# Get format detection
|
|
format_info = await detect_format(local_path)
|
|
|
|
# Build health report
|
|
health_report = {
|
|
"overall_health": "healthy" if validation["is_valid"] else "unhealthy",
|
|
"validation": validation,
|
|
"format_info": format_info,
|
|
"analysis_time": round(time.time() - start_time, 3)
|
|
}
|
|
|
|
# Add recommendations
|
|
if not validation["is_valid"]:
|
|
health_report["recommendations"] = [
|
|
"File validation failed - check for corruption",
|
|
"Try opening file in native application",
|
|
"Consider file recovery tools if data is critical"
|
|
]
|
|
else:
|
|
health_report["recommendations"] = [
|
|
"File appears healthy and readable",
|
|
"All validation checks passed"
|
|
]
|
|
|
|
return health_report
|
|
|
|
except Exception as e:
|
|
return {
|
|
"overall_health": "error",
|
|
"error": str(e),
|
|
"analysis_time": round(time.time() - start_time, 3),
|
|
"recommendations": [
|
|
"File could not be analyzed",
|
|
"Check file path and permissions",
|
|
"Verify file is not corrupted"
|
|
]
|
|
}
|
|
|
|
@mcp_tool(
|
|
name="get_supported_formats",
|
|
description="Get list of all supported Office document formats and their capabilities."
|
|
)
|
|
async def get_supported_formats(self) -> dict[str, Any]:
|
|
extensions = get_supported_extensions()
|
|
|
|
format_details = {}
|
|
for ext in extensions:
|
|
if ext.startswith('.doc'):
|
|
category = "word"
|
|
legacy = ext == ".doc"
|
|
elif ext.startswith('.xls') or ext == '.csv':
|
|
category = "excel"
|
|
legacy = ext == ".xls"
|
|
elif ext.startswith('.ppt'):
|
|
category = "powerpoint"
|
|
legacy = ext == ".ppt"
|
|
else:
|
|
category = "other"
|
|
legacy = False
|
|
|
|
format_details[ext] = {
|
|
"category": category,
|
|
"legacy_format": legacy,
|
|
"text_extraction": True,
|
|
"image_extraction": ext != ".csv",
|
|
"metadata_extraction": True,
|
|
"markdown_conversion": category == "word"
|
|
}
|
|
|
|
return {
|
|
"supported_extensions": extensions,
|
|
"format_details": format_details,
|
|
"categories": {
|
|
"word": [ext for ext, info in format_details.items() if info["category"] == "word"],
|
|
"excel": [ext for ext, info in format_details.items() if info["category"] == "excel"],
|
|
"powerpoint": [ext for ext, info in format_details.items() if info["category"] == "powerpoint"]
|
|
},
|
|
"total_formats": len(extensions)
|
|
}
|
|
|
|
# Helper methods - these will be imported from the original server.py
|
|
async def _extract_text_by_category(self, file_path: str, extension: str, category: str, preserve_formatting: bool, method: str) -> dict[str, Any]:
|
|
"""Extract text based on document category."""
|
|
# Import the appropriate extraction function
|
|
from ..server_monolithic import _extract_word_text, _extract_excel_text, _extract_powerpoint_text
|
|
|
|
if category == "word":
|
|
return await _extract_word_text(file_path, extension, preserve_formatting, method)
|
|
elif category == "excel":
|
|
return await _extract_excel_text(file_path, extension, preserve_formatting, method)
|
|
elif category == "powerpoint":
|
|
return await _extract_powerpoint_text(file_path, extension, preserve_formatting, method)
|
|
else:
|
|
raise OfficeFileError(f"Unsupported document category: {category}")
|
|
|
|
async def _extract_images_by_category(self, file_path: str, extension: str, category: str, output_format: str, min_width: int, min_height: int) -> list[dict[str, Any]]:
|
|
"""Extract images based on document category."""
|
|
from ..server_monolithic import _extract_word_images, _extract_excel_images, _extract_powerpoint_images
|
|
|
|
if category == "word":
|
|
return await _extract_word_images(file_path, extension, output_format, min_width, min_height)
|
|
elif category == "excel":
|
|
return await _extract_excel_images(file_path, extension, output_format, min_width, min_height)
|
|
elif category == "powerpoint":
|
|
return await _extract_powerpoint_images(file_path, extension, output_format, min_width, min_height)
|
|
else:
|
|
return [] # CSV and other formats don't support images
|
|
|
|
async def _extract_metadata_by_category(self, file_path: str, extension: str, category: str) -> dict[str, Any]:
|
|
"""Extract metadata based on document category."""
|
|
from ..server_monolithic import _extract_word_metadata, _extract_excel_metadata, _extract_powerpoint_metadata, _extract_basic_metadata
|
|
|
|
# Get basic metadata first
|
|
metadata = await _extract_basic_metadata(file_path, extension, category)
|
|
|
|
# Add category-specific metadata
|
|
if category == "word":
|
|
specific_metadata = await _extract_word_metadata(file_path, extension)
|
|
elif category == "excel":
|
|
specific_metadata = await _extract_excel_metadata(file_path, extension)
|
|
elif category == "powerpoint":
|
|
specific_metadata = await _extract_powerpoint_metadata(file_path, extension)
|
|
else:
|
|
specific_metadata = {}
|
|
|
|
metadata.update(specific_metadata)
|
|
return metadata
|
|
|
|
async def _extract_basic_metadata(self, file_path: str, extension: str, category: str) -> dict[str, Any]:
|
|
"""Extract basic metadata common to all documents."""
|
|
from ..server_monolithic import _extract_basic_metadata
|
|
return await _extract_basic_metadata(file_path, extension, category) |