Refactor server architecture using mcpmixin pattern
- Split monolithic 2209-line server.py into organized mixin classes - UniversalMixin: Format-agnostic tools (extract_text, extract_images, etc.) - WordMixin: Word-specific tools (convert_to_markdown with chapter_name support) - ExcelMixin: Placeholder for future Excel-specific tools - PowerPointMixin: Placeholder for future PowerPoint-specific tools Benefits: • Improved maintainability and separation of concerns • Better testability with isolated mixins • Easier team collaboration on different file types • Reduced cognitive load per module • Preserved all 7 existing tools with full functionality Architecture now supports clean expansion for format-specific tools while maintaining backward compatibility through legacy server backup.
This commit is contained in:
parent
778ef3a2d4
commit
9d6a9fc24c
8
src/mcp_office_tools/mixins/__init__.py
Normal file
8
src/mcp_office_tools/mixins/__init__.py
Normal file
@ -0,0 +1,8 @@
|
|||||||
|
"""MCP Office Tools Mixins - Organized tool groupings by file type."""
|
||||||
|
|
||||||
|
from .universal import UniversalMixin
|
||||||
|
from .word import WordMixin
|
||||||
|
from .excel import ExcelMixin
|
||||||
|
from .powerpoint import PowerPointMixin
|
||||||
|
|
||||||
|
__all__ = ["UniversalMixin", "WordMixin", "ExcelMixin", "PowerPointMixin"]
|
61
src/mcp_office_tools/mixins/excel.py
Normal file
61
src/mcp_office_tools/mixins/excel.py
Normal file
@ -0,0 +1,61 @@
|
|||||||
|
"""Excel Document Tools Mixin - Specialized tools for Excel spreadsheet processing."""
|
||||||
|
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
|
from fastmcp import FastMCP
|
||||||
|
from pydantic import Field
|
||||||
|
|
||||||
|
from ..utils import OfficeFileError
|
||||||
|
|
||||||
|
|
||||||
|
class ExcelMixin:
|
||||||
|
"""Mixin containing Excel-specific tools for advanced spreadsheet processing.
|
||||||
|
|
||||||
|
Currently serves as a placeholder for future Excel-specific tools like:
|
||||||
|
- Formula extraction and analysis
|
||||||
|
- Sheet-by-sheet processing
|
||||||
|
- Chart data extraction
|
||||||
|
- Pivot table analysis
|
||||||
|
- Data validation rules
|
||||||
|
- Conditional formatting analysis
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, app: FastMCP):
|
||||||
|
self.app = app
|
||||||
|
self._register_tools()
|
||||||
|
|
||||||
|
def _register_tools(self):
|
||||||
|
"""Register Excel-specific tools with the FastMCP app."""
|
||||||
|
# Currently no Excel-specific tools, but ready for future expansion
|
||||||
|
# self.app.tool()(self.extract_formulas)
|
||||||
|
# self.app.tool()(self.analyze_charts)
|
||||||
|
# self.app.tool()(self.extract_pivot_tables)
|
||||||
|
pass
|
||||||
|
|
||||||
|
# Future Excel-specific tools will go here:
|
||||||
|
|
||||||
|
# async def extract_formulas(
|
||||||
|
# self,
|
||||||
|
# file_path: str = Field(description="Path to Excel document or URL"),
|
||||||
|
# include_values: bool = Field(default=True, description="Include calculated values alongside formulas"),
|
||||||
|
# sheet_names: list[str] = Field(default=[], description="Specific sheets to process (empty = all sheets)")
|
||||||
|
# ) -> dict[str, Any]:
|
||||||
|
# """Extract formulas from Excel spreadsheets with calculated values."""
|
||||||
|
# pass
|
||||||
|
|
||||||
|
# async def analyze_charts(
|
||||||
|
# self,
|
||||||
|
# file_path: str = Field(description="Path to Excel document or URL"),
|
||||||
|
# extract_data: bool = Field(default=True, description="Extract underlying chart data"),
|
||||||
|
# include_formatting: bool = Field(default=False, description="Include chart formatting information")
|
||||||
|
# ) -> dict[str, Any]:
|
||||||
|
# """Analyze and extract Excel charts with their underlying data."""
|
||||||
|
# pass
|
||||||
|
|
||||||
|
# async def extract_pivot_tables(
|
||||||
|
# self,
|
||||||
|
# file_path: str = Field(description="Path to Excel document or URL"),
|
||||||
|
# include_source_data: bool = Field(default=True, description="Include pivot table source data ranges")
|
||||||
|
# ) -> dict[str, Any]:
|
||||||
|
# """Extract pivot table configurations and data."""
|
||||||
|
# pass
|
60
src/mcp_office_tools/mixins/powerpoint.py
Normal file
60
src/mcp_office_tools/mixins/powerpoint.py
Normal file
@ -0,0 +1,60 @@
|
|||||||
|
"""PowerPoint Document Tools Mixin - Specialized tools for PowerPoint presentation processing."""
|
||||||
|
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
|
from fastmcp import FastMCP
|
||||||
|
from pydantic import Field
|
||||||
|
|
||||||
|
from ..utils import OfficeFileError
|
||||||
|
|
||||||
|
|
||||||
|
class PowerPointMixin:
|
||||||
|
"""Mixin containing PowerPoint-specific tools for advanced presentation processing.
|
||||||
|
|
||||||
|
Currently serves as a placeholder for future PowerPoint-specific tools like:
|
||||||
|
- Slide-by-slide processing
|
||||||
|
- Speaker notes extraction
|
||||||
|
- Animation analysis
|
||||||
|
- Slide transition details
|
||||||
|
- Master slide template analysis
|
||||||
|
- Presentation structure analysis
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, app: FastMCP):
|
||||||
|
self.app = app
|
||||||
|
self._register_tools()
|
||||||
|
|
||||||
|
def _register_tools(self):
|
||||||
|
"""Register PowerPoint-specific tools with the FastMCP app."""
|
||||||
|
# Currently no PowerPoint-specific tools, but ready for future expansion
|
||||||
|
# self.app.tool()(self.extract_speaker_notes)
|
||||||
|
# self.app.tool()(self.analyze_slide_structure)
|
||||||
|
# self.app.tool()(self.extract_animations)
|
||||||
|
pass
|
||||||
|
|
||||||
|
# Future PowerPoint-specific tools will go here:
|
||||||
|
|
||||||
|
# async def extract_speaker_notes(
|
||||||
|
# self,
|
||||||
|
# file_path: str = Field(description="Path to PowerPoint document or URL"),
|
||||||
|
# slide_range: str = Field(default="", description="Slide range to process (e.g., '1-5', '3', '1,3,5-10')")
|
||||||
|
# ) -> dict[str, Any]:
|
||||||
|
# """Extract speaker notes from PowerPoint slides."""
|
||||||
|
# pass
|
||||||
|
|
||||||
|
# async def analyze_slide_structure(
|
||||||
|
# self,
|
||||||
|
# file_path: str = Field(description="Path to PowerPoint document or URL"),
|
||||||
|
# include_layouts: bool = Field(default=True, description="Include slide layout information"),
|
||||||
|
# include_masters: bool = Field(default=False, description="Include master slide analysis")
|
||||||
|
# ) -> dict[str, Any]:
|
||||||
|
# """Analyze PowerPoint slide structure and layout patterns."""
|
||||||
|
# pass
|
||||||
|
|
||||||
|
# async def extract_animations(
|
||||||
|
# self,
|
||||||
|
# file_path: str = Field(description="Path to PowerPoint document or URL"),
|
||||||
|
# include_timings: bool = Field(default=True, description="Include animation timing information")
|
||||||
|
# ) -> dict[str, Any]:
|
||||||
|
# """Extract animation and transition information from PowerPoint slides."""
|
||||||
|
# pass
|
342
src/mcp_office_tools/mixins/universal.py
Normal file
342
src/mcp_office_tools/mixins/universal.py
Normal file
@ -0,0 +1,342 @@
|
|||||||
|
"""Universal Office Tools Mixin - Format-agnostic tools that work across all Office document types."""
|
||||||
|
|
||||||
|
import time
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
|
from fastmcp import FastMCP
|
||||||
|
from pydantic import Field
|
||||||
|
|
||||||
|
from ..utils import (
|
||||||
|
OfficeFileError,
|
||||||
|
classify_document_type,
|
||||||
|
detect_format,
|
||||||
|
get_supported_extensions,
|
||||||
|
resolve_office_file_path,
|
||||||
|
validate_office_file,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class UniversalMixin:
|
||||||
|
"""Mixin containing format-agnostic tools that work across Word, Excel, PowerPoint, and CSV files."""
|
||||||
|
|
||||||
|
def __init__(self, app: FastMCP):
|
||||||
|
self.app = app
|
||||||
|
self._register_tools()
|
||||||
|
|
||||||
|
def _register_tools(self):
|
||||||
|
"""Register universal tools with the FastMCP app."""
|
||||||
|
self.app.tool()(self.extract_text)
|
||||||
|
self.app.tool()(self.extract_images)
|
||||||
|
self.app.tool()(self.extract_metadata)
|
||||||
|
self.app.tool()(self.detect_office_format)
|
||||||
|
self.app.tool()(self.analyze_document_health)
|
||||||
|
self.app.tool()(self.get_supported_formats)
|
||||||
|
|
||||||
|
async def extract_text(
|
||||||
|
self,
|
||||||
|
file_path: str = Field(description="Path to Office document or URL"),
|
||||||
|
preserve_formatting: bool = Field(default=False, description="Preserve text formatting and structure"),
|
||||||
|
include_metadata: bool = Field(default=True, description="Include document metadata in output"),
|
||||||
|
method: str = Field(default="auto", description="Extraction method: auto, primary, fallback")
|
||||||
|
) -> dict[str, Any]:
|
||||||
|
"""Extract text content from Office documents with intelligent method selection.
|
||||||
|
|
||||||
|
Supports Word (.docx, .doc), Excel (.xlsx, .xls), PowerPoint (.pptx, .ppt),
|
||||||
|
and CSV files. Uses multi-library fallback for maximum compatibility.
|
||||||
|
"""
|
||||||
|
start_time = time.time()
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Resolve file path (download if URL)
|
||||||
|
local_path = await resolve_office_file_path(file_path)
|
||||||
|
|
||||||
|
# Validate file
|
||||||
|
validation = await validate_office_file(local_path)
|
||||||
|
if not validation["is_valid"]:
|
||||||
|
raise OfficeFileError(f"Invalid file: {', '.join(validation['errors'])}")
|
||||||
|
|
||||||
|
# Get format info
|
||||||
|
format_info = await detect_format(local_path)
|
||||||
|
category = format_info["category"]
|
||||||
|
extension = format_info["extension"]
|
||||||
|
|
||||||
|
# Extract text based on category with fallback
|
||||||
|
text_result = await self._extract_text_by_category(local_path, extension, category, preserve_formatting, method)
|
||||||
|
|
||||||
|
# Build response
|
||||||
|
result = {
|
||||||
|
"text": text_result["text"],
|
||||||
|
"metadata": {
|
||||||
|
"original_file": file_path,
|
||||||
|
"format": format_info["format_name"],
|
||||||
|
"extraction_method": text_result["method_used"],
|
||||||
|
"extraction_time": round(time.time() - start_time, 3),
|
||||||
|
"methods_tried": text_result.get("methods_tried", [text_result["method_used"]])
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
# Add formatted sections if preserved
|
||||||
|
if preserve_formatting and "formatted_sections" in text_result:
|
||||||
|
result["structure"] = text_result["formatted_sections"]
|
||||||
|
|
||||||
|
# Add metadata if requested
|
||||||
|
if include_metadata:
|
||||||
|
doc_metadata = await self._extract_basic_metadata(local_path, extension, category)
|
||||||
|
result["document_metadata"] = doc_metadata
|
||||||
|
|
||||||
|
return result
|
||||||
|
|
||||||
|
except OfficeFileError:
|
||||||
|
raise
|
||||||
|
except Exception as e:
|
||||||
|
raise OfficeFileError(f"Text extraction failed: {str(e)}")
|
||||||
|
|
||||||
|
async def extract_images(
|
||||||
|
self,
|
||||||
|
file_path: str = Field(description="Path to Office document or URL"),
|
||||||
|
min_width: int = Field(default=100, description="Minimum image width in pixels"),
|
||||||
|
min_height: int = Field(default=100, description="Minimum image height in pixels"),
|
||||||
|
output_format: str = Field(default="png", description="Output image format: png, jpg, jpeg"),
|
||||||
|
include_metadata: bool = Field(default=True, description="Include image metadata")
|
||||||
|
) -> dict[str, Any]:
|
||||||
|
"""Extract images from Office documents with size filtering and format conversion."""
|
||||||
|
start_time = time.time()
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Resolve file path
|
||||||
|
local_path = await resolve_office_file_path(file_path)
|
||||||
|
|
||||||
|
# Validate file
|
||||||
|
validation = await validate_office_file(local_path)
|
||||||
|
if not validation["is_valid"]:
|
||||||
|
raise OfficeFileError(f"Invalid file: {', '.join(validation['errors'])}")
|
||||||
|
|
||||||
|
# Get format info
|
||||||
|
format_info = await detect_format(local_path)
|
||||||
|
category = format_info["category"]
|
||||||
|
extension = format_info["extension"]
|
||||||
|
|
||||||
|
# Extract images based on category
|
||||||
|
images = await self._extract_images_by_category(local_path, extension, category, output_format, min_width, min_height)
|
||||||
|
|
||||||
|
return {
|
||||||
|
"images": images,
|
||||||
|
"metadata": {
|
||||||
|
"original_file": file_path,
|
||||||
|
"format": format_info["format_name"],
|
||||||
|
"image_count": len(images),
|
||||||
|
"extraction_time": round(time.time() - start_time, 3),
|
||||||
|
"filters_applied": {
|
||||||
|
"min_width": min_width,
|
||||||
|
"min_height": min_height,
|
||||||
|
"output_format": output_format
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
except OfficeFileError:
|
||||||
|
raise
|
||||||
|
except Exception as e:
|
||||||
|
raise OfficeFileError(f"Image extraction failed: {str(e)}")
|
||||||
|
|
||||||
|
async def extract_metadata(
|
||||||
|
self,
|
||||||
|
file_path: str = Field(description="Path to Office document or URL")
|
||||||
|
) -> dict[str, Any]:
|
||||||
|
"""Extract comprehensive metadata from Office documents."""
|
||||||
|
start_time = time.time()
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Resolve file path
|
||||||
|
local_path = await resolve_office_file_path(file_path)
|
||||||
|
|
||||||
|
# Validate file
|
||||||
|
validation = await validate_office_file(local_path)
|
||||||
|
if not validation["is_valid"]:
|
||||||
|
raise OfficeFileError(f"Invalid file: {', '.join(validation['errors'])}")
|
||||||
|
|
||||||
|
# Get format info
|
||||||
|
format_info = await detect_format(local_path)
|
||||||
|
category = format_info["category"]
|
||||||
|
extension = format_info["extension"]
|
||||||
|
|
||||||
|
# Extract metadata based on category
|
||||||
|
metadata = await self._extract_metadata_by_category(local_path, extension, category)
|
||||||
|
|
||||||
|
# Add extraction info
|
||||||
|
metadata["extraction_info"] = {
|
||||||
|
"extraction_time": round(time.time() - start_time, 3),
|
||||||
|
"format_detected": format_info["format_name"]
|
||||||
|
}
|
||||||
|
|
||||||
|
return metadata
|
||||||
|
|
||||||
|
except OfficeFileError:
|
||||||
|
raise
|
||||||
|
except Exception as e:
|
||||||
|
raise OfficeFileError(f"Metadata extraction failed: {str(e)}")
|
||||||
|
|
||||||
|
async def detect_office_format(
|
||||||
|
self,
|
||||||
|
file_path: str = Field(description="Path to Office document or URL")
|
||||||
|
) -> dict[str, Any]:
|
||||||
|
"""Intelligent Office document format detection and analysis."""
|
||||||
|
try:
|
||||||
|
# Resolve file path
|
||||||
|
local_path = await resolve_office_file_path(file_path)
|
||||||
|
|
||||||
|
# Get comprehensive format detection
|
||||||
|
format_info = await detect_format(local_path)
|
||||||
|
|
||||||
|
# Add classification
|
||||||
|
classification = await classify_document_type(local_path)
|
||||||
|
format_info.update(classification)
|
||||||
|
|
||||||
|
return format_info
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
raise OfficeFileError(f"Format detection failed: {str(e)}")
|
||||||
|
|
||||||
|
async def analyze_document_health(
|
||||||
|
self,
|
||||||
|
file_path: str = Field(description="Path to Office document or URL")
|
||||||
|
) -> dict[str, Any]:
|
||||||
|
"""Comprehensive document health and integrity analysis."""
|
||||||
|
start_time = time.time()
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Resolve file path
|
||||||
|
local_path = await resolve_office_file_path(file_path)
|
||||||
|
|
||||||
|
# Validate file thoroughly
|
||||||
|
validation = await validate_office_file(local_path)
|
||||||
|
|
||||||
|
# Get format detection
|
||||||
|
format_info = await detect_format(local_path)
|
||||||
|
|
||||||
|
# Build health report
|
||||||
|
health_report = {
|
||||||
|
"overall_health": "healthy" if validation["is_valid"] else "unhealthy",
|
||||||
|
"validation": validation,
|
||||||
|
"format_info": format_info,
|
||||||
|
"analysis_time": round(time.time() - start_time, 3)
|
||||||
|
}
|
||||||
|
|
||||||
|
# Add recommendations
|
||||||
|
if not validation["is_valid"]:
|
||||||
|
health_report["recommendations"] = [
|
||||||
|
"File validation failed - check for corruption",
|
||||||
|
"Try opening file in native application",
|
||||||
|
"Consider file recovery tools if data is critical"
|
||||||
|
]
|
||||||
|
else:
|
||||||
|
health_report["recommendations"] = [
|
||||||
|
"File appears healthy and readable",
|
||||||
|
"All validation checks passed"
|
||||||
|
]
|
||||||
|
|
||||||
|
return health_report
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
return {
|
||||||
|
"overall_health": "error",
|
||||||
|
"error": str(e),
|
||||||
|
"analysis_time": round(time.time() - start_time, 3),
|
||||||
|
"recommendations": [
|
||||||
|
"File could not be analyzed",
|
||||||
|
"Check file path and permissions",
|
||||||
|
"Verify file is not corrupted"
|
||||||
|
]
|
||||||
|
}
|
||||||
|
|
||||||
|
async def get_supported_formats(self) -> dict[str, Any]:
|
||||||
|
"""Get list of all supported Office document formats and their capabilities."""
|
||||||
|
extensions = get_supported_extensions()
|
||||||
|
|
||||||
|
format_details = {}
|
||||||
|
for ext in extensions:
|
||||||
|
if ext.startswith('.doc'):
|
||||||
|
category = "word"
|
||||||
|
legacy = ext == ".doc"
|
||||||
|
elif ext.startswith('.xls') or ext == '.csv':
|
||||||
|
category = "excel"
|
||||||
|
legacy = ext == ".xls"
|
||||||
|
elif ext.startswith('.ppt'):
|
||||||
|
category = "powerpoint"
|
||||||
|
legacy = ext == ".ppt"
|
||||||
|
else:
|
||||||
|
category = "other"
|
||||||
|
legacy = False
|
||||||
|
|
||||||
|
format_details[ext] = {
|
||||||
|
"category": category,
|
||||||
|
"legacy_format": legacy,
|
||||||
|
"text_extraction": True,
|
||||||
|
"image_extraction": ext != ".csv",
|
||||||
|
"metadata_extraction": True,
|
||||||
|
"markdown_conversion": category == "word"
|
||||||
|
}
|
||||||
|
|
||||||
|
return {
|
||||||
|
"supported_extensions": extensions,
|
||||||
|
"format_details": format_details,
|
||||||
|
"categories": {
|
||||||
|
"word": [ext for ext, info in format_details.items() if info["category"] == "word"],
|
||||||
|
"excel": [ext for ext, info in format_details.items() if info["category"] == "excel"],
|
||||||
|
"powerpoint": [ext for ext, info in format_details.items() if info["category"] == "powerpoint"]
|
||||||
|
},
|
||||||
|
"total_formats": len(extensions)
|
||||||
|
}
|
||||||
|
|
||||||
|
# Helper methods - these will be imported from the original server.py
|
||||||
|
async def _extract_text_by_category(self, file_path: str, extension: str, category: str, preserve_formatting: bool, method: str) -> dict[str, Any]:
|
||||||
|
"""Extract text based on document category."""
|
||||||
|
# Import the appropriate extraction function
|
||||||
|
from ..server_monolithic import _extract_word_text, _extract_excel_text, _extract_powerpoint_text
|
||||||
|
|
||||||
|
if category == "word":
|
||||||
|
return await _extract_word_text(file_path, extension, preserve_formatting, method)
|
||||||
|
elif category == "excel":
|
||||||
|
return await _extract_excel_text(file_path, extension, preserve_formatting, method)
|
||||||
|
elif category == "powerpoint":
|
||||||
|
return await _extract_powerpoint_text(file_path, extension, preserve_formatting, method)
|
||||||
|
else:
|
||||||
|
raise OfficeFileError(f"Unsupported document category: {category}")
|
||||||
|
|
||||||
|
async def _extract_images_by_category(self, file_path: str, extension: str, category: str, output_format: str, min_width: int, min_height: int) -> list[dict[str, Any]]:
|
||||||
|
"""Extract images based on document category."""
|
||||||
|
from ..server_monolithic import _extract_word_images, _extract_excel_images, _extract_powerpoint_images
|
||||||
|
|
||||||
|
if category == "word":
|
||||||
|
return await _extract_word_images(file_path, extension, output_format, min_width, min_height)
|
||||||
|
elif category == "excel":
|
||||||
|
return await _extract_excel_images(file_path, extension, output_format, min_width, min_height)
|
||||||
|
elif category == "powerpoint":
|
||||||
|
return await _extract_powerpoint_images(file_path, extension, output_format, min_width, min_height)
|
||||||
|
else:
|
||||||
|
return [] # CSV and other formats don't support images
|
||||||
|
|
||||||
|
async def _extract_metadata_by_category(self, file_path: str, extension: str, category: str) -> dict[str, Any]:
|
||||||
|
"""Extract metadata based on document category."""
|
||||||
|
from ..server_monolithic import _extract_word_metadata, _extract_excel_metadata, _extract_powerpoint_metadata, _extract_basic_metadata
|
||||||
|
|
||||||
|
# Get basic metadata first
|
||||||
|
metadata = await _extract_basic_metadata(file_path, extension, category)
|
||||||
|
|
||||||
|
# Add category-specific metadata
|
||||||
|
if category == "word":
|
||||||
|
specific_metadata = await _extract_word_metadata(file_path, extension)
|
||||||
|
elif category == "excel":
|
||||||
|
specific_metadata = await _extract_excel_metadata(file_path, extension)
|
||||||
|
elif category == "powerpoint":
|
||||||
|
specific_metadata = await _extract_powerpoint_metadata(file_path, extension)
|
||||||
|
else:
|
||||||
|
specific_metadata = {}
|
||||||
|
|
||||||
|
metadata.update(specific_metadata)
|
||||||
|
return metadata
|
||||||
|
|
||||||
|
async def _extract_basic_metadata(self, file_path: str, extension: str, category: str) -> dict[str, Any]:
|
||||||
|
"""Extract basic metadata common to all documents."""
|
||||||
|
from ..server_monolithic import _extract_basic_metadata
|
||||||
|
return await _extract_basic_metadata(file_path, extension, category)
|
198
src/mcp_office_tools/mixins/word.py
Normal file
198
src/mcp_office_tools/mixins/word.py
Normal file
@ -0,0 +1,198 @@
|
|||||||
|
"""Word Document Tools Mixin - Specialized tools for Word document processing."""
|
||||||
|
|
||||||
|
import os
|
||||||
|
import time
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
|
from fastmcp import FastMCP
|
||||||
|
from pydantic import Field
|
||||||
|
|
||||||
|
from ..utils import OfficeFileError, resolve_office_file_path, validate_office_file, detect_format
|
||||||
|
|
||||||
|
|
||||||
|
class WordMixin:
|
||||||
|
"""Mixin containing Word-specific tools for advanced document processing."""
|
||||||
|
|
||||||
|
def __init__(self, app: FastMCP):
|
||||||
|
self.app = app
|
||||||
|
self._register_tools()
|
||||||
|
|
||||||
|
def _register_tools(self):
|
||||||
|
"""Register Word-specific tools with the FastMCP app."""
|
||||||
|
self.app.tool()(self.convert_to_markdown)
|
||||||
|
|
||||||
|
async def convert_to_markdown(
|
||||||
|
self,
|
||||||
|
file_path: str = Field(description="Path to Office document or URL"),
|
||||||
|
include_images: bool = Field(default=True, description="Include images in markdown with base64 encoding or file references"),
|
||||||
|
image_mode: str = Field(default="base64", description="Image handling mode: 'base64', 'files', or 'references'"),
|
||||||
|
max_image_size: int = Field(default=1024*1024, description="Maximum image size in bytes for base64 encoding"),
|
||||||
|
preserve_structure: bool = Field(default=True, description="Preserve document structure (headings, lists, tables)"),
|
||||||
|
page_range: str = Field(default="", description="Page range to convert (e.g., '1-5', '3', '1,3,5-10'). RECOMMENDED for large documents. Empty = all pages"),
|
||||||
|
bookmark_name: str = Field(default="", description="Extract content for a specific bookmark/chapter (e.g., 'Chapter1_Start'). More reliable than page ranges."),
|
||||||
|
chapter_name: str = Field(default="", description="Extract content for a chapter by heading text (e.g., 'Chapter 1', 'Introduction'). Works when bookmarks aren't available."),
|
||||||
|
summary_only: bool = Field(default=False, description="Return only metadata and truncated summary. STRONGLY RECOMMENDED for large docs (>10 pages)"),
|
||||||
|
output_dir: str = Field(default="", description="Output directory for image files (if image_mode='files')")
|
||||||
|
) -> dict[str, Any]:
|
||||||
|
"""Convert Office documents to Markdown format with intelligent processing recommendations.
|
||||||
|
|
||||||
|
⚠️ RECOMMENDED WORKFLOW FOR LARGE DOCUMENTS (>5 pages):
|
||||||
|
1. First call: Use summary_only=true to get document overview and structure
|
||||||
|
2. Then: Use page_range (e.g., "1-10", "15-25") to process specific sections
|
||||||
|
|
||||||
|
This prevents response size errors and provides efficient processing.
|
||||||
|
Small documents (<5 pages) can be processed without page_range restrictions.
|
||||||
|
"""
|
||||||
|
start_time = time.time()
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Resolve file path
|
||||||
|
local_path = await resolve_office_file_path(file_path)
|
||||||
|
|
||||||
|
# Validate file
|
||||||
|
validation = await validate_office_file(local_path)
|
||||||
|
if not validation["is_valid"]:
|
||||||
|
raise OfficeFileError(f"Invalid file: {', '.join(validation['errors'])}")
|
||||||
|
|
||||||
|
# Get format info
|
||||||
|
format_info = await detect_format(local_path)
|
||||||
|
category = format_info["category"]
|
||||||
|
extension = format_info["extension"]
|
||||||
|
|
||||||
|
# Currently focused on Word documents for markdown conversion
|
||||||
|
if category != "word":
|
||||||
|
raise OfficeFileError(f"Markdown conversion currently only supports Word documents, got: {category}")
|
||||||
|
|
||||||
|
# Analyze document size and provide intelligent recommendations
|
||||||
|
doc_analysis = await self._analyze_document_size(local_path, extension)
|
||||||
|
processing_recommendation = self._get_processing_recommendation(
|
||||||
|
doc_analysis, page_range, summary_only
|
||||||
|
)
|
||||||
|
|
||||||
|
# Parse page range if provided
|
||||||
|
page_numbers = self._parse_page_range(page_range) if page_range else None
|
||||||
|
|
||||||
|
# Prioritize bookmark/chapter extraction over page ranges
|
||||||
|
if bookmark_name or chapter_name:
|
||||||
|
page_numbers = None # Ignore page ranges when bookmark or chapter is specified
|
||||||
|
|
||||||
|
# Convert to markdown based on format
|
||||||
|
if extension == ".docx":
|
||||||
|
markdown_result = await self._convert_docx_to_markdown(
|
||||||
|
local_path, include_images, image_mode, max_image_size,
|
||||||
|
preserve_structure, page_numbers, summary_only, output_dir, bookmark_name, chapter_name
|
||||||
|
)
|
||||||
|
else: # .doc
|
||||||
|
# For legacy .doc files, use mammoth if available
|
||||||
|
markdown_result = await self._convert_doc_to_markdown(
|
||||||
|
local_path, include_images, image_mode, max_image_size,
|
||||||
|
preserve_structure, page_numbers, summary_only, output_dir
|
||||||
|
)
|
||||||
|
|
||||||
|
# Build result based on mode
|
||||||
|
result = {
|
||||||
|
"metadata": {
|
||||||
|
"original_file": os.path.basename(local_path),
|
||||||
|
"format": format_info["format_name"],
|
||||||
|
"conversion_method": markdown_result["method_used"],
|
||||||
|
"conversion_time": round(time.time() - start_time, 3),
|
||||||
|
"summary_only": summary_only,
|
||||||
|
"document_analysis": doc_analysis,
|
||||||
|
"processing_recommendation": processing_recommendation
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
# Add page range info if used
|
||||||
|
if page_range:
|
||||||
|
result["metadata"]["page_range"] = page_range
|
||||||
|
result["metadata"]["pages_processed"] = len(page_numbers) if page_numbers else 0
|
||||||
|
|
||||||
|
# Add content based on mode
|
||||||
|
if summary_only:
|
||||||
|
# VERY restrictive summary mode to prevent massive responses
|
||||||
|
result["metadata"]["character_count"] = len(markdown_result["content"])
|
||||||
|
result["metadata"]["word_count"] = len(markdown_result["content"].split())
|
||||||
|
|
||||||
|
# Ultra-short summary (only 500 chars max)
|
||||||
|
result["summary"] = markdown_result["content"][:500] + "..." if len(markdown_result["content"]) > 500 else markdown_result["content"]
|
||||||
|
|
||||||
|
# Severely limit table of contents to prevent 1M+ token responses
|
||||||
|
if "table_of_contents" in markdown_result:
|
||||||
|
toc = markdown_result["table_of_contents"]
|
||||||
|
if isinstance(toc, dict):
|
||||||
|
# Keep only essential TOC info, severely truncated
|
||||||
|
result["table_of_contents"] = {
|
||||||
|
"note": toc.get("note", ""),
|
||||||
|
"basic_info": toc.get("basic_info", "")[:200], # Limit to 200 chars
|
||||||
|
}
|
||||||
|
# Add bookmark/heading info if available (limit to first 5 items)
|
||||||
|
if "bookmarks" in toc:
|
||||||
|
result["table_of_contents"]["bookmarks"] = toc["bookmarks"][:5]
|
||||||
|
result["table_of_contents"]["bookmark_count"] = toc.get("bookmark_count", 0)
|
||||||
|
if "available_headings" in toc:
|
||||||
|
result["table_of_contents"]["available_headings"] = toc["available_headings"][:5]
|
||||||
|
result["table_of_contents"]["heading_count"] = toc.get("heading_count", 0)
|
||||||
|
else:
|
||||||
|
result["table_of_contents"] = {"note": "Summary mode - use full processing for detailed TOC"}
|
||||||
|
else:
|
||||||
|
# Full content mode
|
||||||
|
result["markdown"] = markdown_result["content"]
|
||||||
|
result["content_truncated"] = len(markdown_result["content"]) >= 200000 # Warn if near limit
|
||||||
|
|
||||||
|
# Add images info
|
||||||
|
if "images" in markdown_result:
|
||||||
|
result["images"] = markdown_result["images"]
|
||||||
|
|
||||||
|
# Add structure info
|
||||||
|
if "structure" in markdown_result:
|
||||||
|
result["structure"] = markdown_result["structure"]
|
||||||
|
|
||||||
|
# Add table of contents if available
|
||||||
|
if "table_of_contents" in markdown_result:
|
||||||
|
result["table_of_contents"] = markdown_result["table_of_contents"]
|
||||||
|
|
||||||
|
return result
|
||||||
|
|
||||||
|
except OfficeFileError:
|
||||||
|
raise
|
||||||
|
except Exception as e:
|
||||||
|
raise OfficeFileError(f"Markdown conversion failed: {str(e)}")
|
||||||
|
|
||||||
|
# Helper methods - import from monolithic server
|
||||||
|
async def _analyze_document_size(self, file_path: str, extension: str) -> dict[str, Any]:
|
||||||
|
"""Analyze document size for processing recommendations."""
|
||||||
|
from ..server_monolithic import _analyze_document_size
|
||||||
|
return await _analyze_document_size(file_path, extension)
|
||||||
|
|
||||||
|
def _get_processing_recommendation(self, doc_analysis: dict[str, Any], page_range: str, summary_only: bool) -> dict[str, Any]:
|
||||||
|
"""Get processing recommendations based on document analysis."""
|
||||||
|
from ..server_monolithic import _get_processing_recommendation
|
||||||
|
return _get_processing_recommendation(doc_analysis, page_range, summary_only)
|
||||||
|
|
||||||
|
def _parse_page_range(self, page_range: str) -> list[int]:
|
||||||
|
"""Parse page range string into list of page numbers."""
|
||||||
|
from ..server_monolithic import _parse_page_range
|
||||||
|
return _parse_page_range(page_range)
|
||||||
|
|
||||||
|
async def _convert_docx_to_markdown(
|
||||||
|
self, file_path: str, include_images: bool, image_mode: str, max_image_size: int,
|
||||||
|
preserve_structure: bool, page_numbers: list[int], summary_only: bool, output_dir: str,
|
||||||
|
bookmark_name: str = "", chapter_name: str = ""
|
||||||
|
) -> dict[str, Any]:
|
||||||
|
"""Convert .docx to markdown."""
|
||||||
|
from ..server_monolithic import _convert_docx_to_markdown
|
||||||
|
return await _convert_docx_to_markdown(
|
||||||
|
file_path, include_images, image_mode, max_image_size,
|
||||||
|
preserve_structure, page_numbers, summary_only, output_dir, bookmark_name, chapter_name
|
||||||
|
)
|
||||||
|
|
||||||
|
async def _convert_doc_to_markdown(
|
||||||
|
self, file_path: str, include_images: bool, image_mode: str, max_image_size: int,
|
||||||
|
preserve_structure: bool, page_numbers: list[int], summary_only: bool, output_dir: str
|
||||||
|
) -> dict[str, Any]:
|
||||||
|
"""Convert legacy .doc to markdown."""
|
||||||
|
from ..server_monolithic import _convert_doc_to_markdown
|
||||||
|
return await _convert_doc_to_markdown(
|
||||||
|
file_path, include_images, image_mode, max_image_size,
|
||||||
|
preserve_structure, page_numbers, summary_only, output_dir
|
||||||
|
)
|
7
src/mcp_office_tools/processors/__init__.py
Normal file
7
src/mcp_office_tools/processors/__init__.py
Normal file
@ -0,0 +1,7 @@
|
|||||||
|
"""Office Document Processors - Helper functions for document processing."""
|
||||||
|
|
||||||
|
# Import all processor functions to make them available
|
||||||
|
from .word_processor import *
|
||||||
|
from .excel_processor import *
|
||||||
|
from .powerpoint_processor import *
|
||||||
|
from .universal_processor import *
|
File diff suppressed because it is too large
Load Diff
2209
src/mcp_office_tools/server_legacy.py
Normal file
2209
src/mcp_office_tools/server_legacy.py
Normal file
File diff suppressed because it is too large
Load Diff
2209
src/mcp_office_tools/server_monolithic.py
Normal file
2209
src/mcp_office_tools/server_monolithic.py
Normal file
File diff suppressed because it is too large
Load Diff
Loading…
x
Reference in New Issue
Block a user