Ryan Malloy a485e05759 Implement true page-range filtering for efficient processing
- Add page break detection using Word XML structure
- Process only specified pages instead of full document + truncation
- Route page-range requests to python-docx for granular control
- Skip mammoth for page-specific processing (mammoth processes full doc)
- Add page metadata to results when filtering is used
- Significantly reduce memory usage and response size for large documents
2025-08-19 13:12:19 -06:00

1554 lines
56 KiB
Python

"""MCP Office Tools Server - Comprehensive Microsoft Office document processing.
FastMCP server providing 30+ tools for processing Word, Excel, PowerPoint documents
including both modern formats (.docx, .xlsx, .pptx) and legacy formats (.doc, .xls, .ppt).
"""
import os
import tempfile
import time
from pathlib import Path
from typing import Any
from fastmcp import FastMCP
from pydantic import Field
from .utils import (
OfficeFileError,
classify_document_type,
detect_format,
get_supported_extensions,
resolve_office_file_path,
validate_office_file,
)
# Initialize FastMCP app
app = FastMCP("MCP Office Tools")
# Configuration
TEMP_DIR = os.environ.get("OFFICE_TEMP_DIR", tempfile.gettempdir())
DEBUG = os.environ.get("DEBUG", "false").lower() == "true"
@app.tool()
async def extract_text(
file_path: str = Field(description="Path to Office document or URL"),
preserve_formatting: bool = Field(default=False, description="Preserve text formatting and structure"),
include_metadata: bool = Field(default=True, description="Include document metadata in output"),
method: str = Field(default="auto", description="Extraction method: auto, primary, fallback")
) -> dict[str, Any]:
"""Extract text content from Office documents with intelligent method selection.
Supports Word (.docx, .doc), Excel (.xlsx, .xls), PowerPoint (.pptx, .ppt),
and CSV files. Uses multi-library fallback for maximum compatibility.
"""
start_time = time.time()
try:
# Resolve file path (download if URL)
local_path = await resolve_office_file_path(file_path)
# Validate file
validation = await validate_office_file(local_path)
if not validation["is_valid"]:
raise OfficeFileError(f"Invalid file: {', '.join(validation['errors'])}")
# Get format info
format_info = await detect_format(local_path)
category = format_info["category"]
extension = format_info["extension"]
# Route to appropriate extraction method
if category == "word":
text_result = await _extract_word_text(local_path, extension, preserve_formatting, method)
elif category == "excel":
text_result = await _extract_excel_text(local_path, extension, preserve_formatting, method)
elif category == "powerpoint":
text_result = await _extract_powerpoint_text(local_path, extension, preserve_formatting, method)
else:
raise OfficeFileError(f"Unsupported document category: {category}")
# Compile results
result = {
"text": text_result["text"],
"method_used": text_result["method_used"],
"character_count": len(text_result["text"]),
"word_count": len(text_result["text"].split()) if text_result["text"] else 0,
"extraction_time": round(time.time() - start_time, 3),
"format_info": {
"format": format_info["format_name"],
"category": category,
"is_legacy": format_info["is_legacy"]
}
}
if include_metadata:
result["metadata"] = await _extract_basic_metadata(local_path, extension, category)
if preserve_formatting:
result["formatted_sections"] = text_result.get("formatted_sections", [])
return result
except Exception as e:
if DEBUG:
import traceback
traceback.print_exc()
raise OfficeFileError(f"Text extraction failed: {str(e)}")
@app.tool()
async def extract_images(
file_path: str = Field(description="Path to Office document or URL"),
output_format: str = Field(default="png", description="Output image format: png, jpg, jpeg"),
min_width: int = Field(default=100, description="Minimum image width in pixels"),
min_height: int = Field(default=100, description="Minimum image height in pixels"),
include_metadata: bool = Field(default=True, description="Include image metadata")
) -> dict[str, Any]:
"""Extract images from Office documents with size filtering and format conversion."""
start_time = time.time()
try:
# Resolve file path
local_path = await resolve_office_file_path(file_path)
# Validate file
validation = await validate_office_file(local_path)
if not validation["is_valid"]:
raise OfficeFileError(f"Invalid file: {', '.join(validation['errors'])}")
# Get format info
format_info = await detect_format(local_path)
category = format_info["category"]
extension = format_info["extension"]
# Extract images based on format
if category == "word":
images = await _extract_word_images(local_path, extension, output_format, min_width, min_height)
elif category == "excel":
images = await _extract_excel_images(local_path, extension, output_format, min_width, min_height)
elif category == "powerpoint":
images = await _extract_powerpoint_images(local_path, extension, output_format, min_width, min_height)
else:
raise OfficeFileError(f"Image extraction not supported for category: {category}")
result = {
"images": images,
"image_count": len(images),
"extraction_time": round(time.time() - start_time, 3),
"format_info": {
"format": format_info["format_name"],
"category": category
}
}
if include_metadata:
result["total_size_bytes"] = sum(img.get("size_bytes", 0) for img in images)
return result
except Exception as e:
if DEBUG:
import traceback
traceback.print_exc()
raise OfficeFileError(f"Image extraction failed: {str(e)}")
@app.tool()
async def extract_metadata(
file_path: str = Field(description="Path to Office document or URL")
) -> dict[str, Any]:
"""Extract comprehensive metadata from Office documents."""
start_time = time.time()
try:
# Resolve file path
local_path = await resolve_office_file_path(file_path)
# Validate file
validation = await validate_office_file(local_path)
if not validation["is_valid"]:
raise OfficeFileError(f"Invalid file: {', '.join(validation['errors'])}")
# Get format info
format_info = await detect_format(local_path)
category = format_info["category"]
extension = format_info["extension"]
# Extract metadata based on format
if category == "word":
metadata = await _extract_word_metadata(local_path, extension)
elif category == "excel":
metadata = await _extract_excel_metadata(local_path, extension)
elif category == "powerpoint":
metadata = await _extract_powerpoint_metadata(local_path, extension)
else:
metadata = {"category": category, "basic_info": "Limited metadata available"}
# Add file system metadata
path = Path(local_path)
stat = path.stat()
result = {
"document_metadata": metadata,
"file_metadata": {
"filename": path.name,
"file_size": stat.st_size,
"created": stat.st_ctime,
"modified": stat.st_mtime,
"extension": extension
},
"format_info": format_info,
"extraction_time": round(time.time() - start_time, 3)
}
return result
except Exception as e:
if DEBUG:
import traceback
traceback.print_exc()
raise OfficeFileError(f"Metadata extraction failed: {str(e)}")
@app.tool()
async def detect_office_format(
file_path: str = Field(description="Path to Office document or URL")
) -> dict[str, Any]:
"""Intelligent Office document format detection and analysis."""
start_time = time.time()
try:
# Resolve file path
local_path = await resolve_office_file_path(file_path)
# Detect format
format_info = await detect_format(local_path)
# Classify document
classification = await classify_document_type(local_path)
result = {
"format_detection": format_info,
"document_classification": classification,
"supported": format_info["is_supported"],
"processing_recommendations": format_info.get("processing_hints", []),
"detection_time": round(time.time() - start_time, 3)
}
return result
except Exception as e:
if DEBUG:
import traceback
traceback.print_exc()
raise OfficeFileError(f"Format detection failed: {str(e)}")
@app.tool()
async def analyze_document_health(
file_path: str = Field(description="Path to Office document or URL")
) -> dict[str, Any]:
"""Comprehensive document health and integrity analysis."""
start_time = time.time()
try:
# Resolve file path
local_path = await resolve_office_file_path(file_path)
# Validate file thoroughly
validation = await validate_office_file(local_path)
# Get format info
format_info = await detect_format(local_path)
# Health assessment
health_score = _calculate_health_score(validation, format_info)
result = {
"overall_health": "healthy" if validation["is_valid"] and health_score >= 8 else
"warning" if health_score >= 5 else "problematic",
"health_score": health_score,
"validation_results": validation,
"format_analysis": format_info,
"recommendations": _get_health_recommendations(validation, format_info),
"analysis_time": round(time.time() - start_time, 3)
}
return result
except Exception as e:
if DEBUG:
import traceback
traceback.print_exc()
raise OfficeFileError(f"Health analysis failed: {str(e)}")
@app.tool()
async def convert_to_markdown(
file_path: str = Field(description="Path to Office document or URL"),
include_images: bool = Field(default=True, description="Include images in markdown with base64 encoding or file references"),
image_mode: str = Field(default="base64", description="Image handling mode: 'base64', 'files', or 'references'"),
max_image_size: int = Field(default=1024*1024, description="Maximum image size in bytes for base64 encoding"),
preserve_structure: bool = Field(default=True, description="Preserve document structure (headings, lists, tables)"),
page_range: str = Field(default="", description="Page range to convert (e.g., '1-5', '3', '1,3,5-10'). Empty = all pages"),
summary_only: bool = Field(default=False, description="Return only metadata and structure summary (for large docs)"),
output_dir: str = Field(default="", description="Output directory for image files (if image_mode='files')")
) -> dict[str, Any]:
"""Convert Office documents to Markdown format with page-range support and structure preservation.
Supports page-based chunking for large documents and summary mode for quick overview.
Use page_range to process specific pages only, or summary_only=true for large documents.
"""
start_time = time.time()
try:
# Resolve file path
local_path = await resolve_office_file_path(file_path)
# Validate file
validation = await validate_office_file(local_path)
if not validation["is_valid"]:
raise OfficeFileError(f"Invalid file: {', '.join(validation['errors'])}")
# Get format info
format_info = await detect_format(local_path)
category = format_info["category"]
extension = format_info["extension"]
# Currently focused on Word documents for markdown conversion
if category != "word":
raise OfficeFileError(f"Markdown conversion currently only supports Word documents, got: {category}")
# Parse page range if provided
page_numbers = _parse_page_range(page_range) if page_range else None
# Convert to markdown based on format
if extension == ".docx":
markdown_result = await _convert_docx_to_markdown(
local_path, include_images, image_mode, max_image_size,
preserve_structure, page_numbers, summary_only, output_dir
)
else: # .doc
# For legacy .doc files, use mammoth if available
markdown_result = await _convert_doc_to_markdown(
local_path, include_images, image_mode, max_image_size,
preserve_structure, page_numbers, summary_only, output_dir
)
# Build result based on mode
result = {
"metadata": {
"original_file": os.path.basename(local_path),
"format": format_info["format_name"],
"conversion_method": markdown_result["method_used"],
"conversion_time": round(time.time() - start_time, 3),
"summary_only": summary_only
}
}
# Add page range info if used
if page_range:
result["metadata"]["page_range"] = page_range
result["metadata"]["pages_processed"] = len(page_numbers) if page_numbers else 0
# Add content based on mode
if summary_only:
# Only include summary information for large documents
result["metadata"]["character_count"] = len(markdown_result["content"])
result["metadata"]["word_count"] = len(markdown_result["content"].split())
result["summary"] = markdown_result["content"][:1000] + "..." if len(markdown_result["content"]) > 1000 else markdown_result["content"]
else:
# Include full content for smaller documents or page ranges
result["markdown"] = markdown_result["content"]
result["metadata"]["character_count"] = len(markdown_result["content"])
result["metadata"]["word_count"] = len(markdown_result["content"].split())
# Add image info
if include_images and markdown_result.get("images"):
result["images"] = markdown_result["images"]
result["metadata"]["image_count"] = len(markdown_result["images"])
result["metadata"]["total_image_size"] = sum(
img.get("size_bytes", 0) for img in markdown_result["images"]
)
# Add structure info
if preserve_structure and markdown_result.get("structure"):
result["structure"] = markdown_result["structure"]
return result
except Exception as e:
if DEBUG:
import traceback
traceback.print_exc()
raise OfficeFileError(f"Markdown conversion failed: {str(e)}")
@app.tool()
async def get_supported_formats() -> dict[str, Any]:
"""Get list of all supported Office document formats and their capabilities."""
extensions = get_supported_extensions()
format_details = {}
for ext in extensions:
from .utils.validation import get_format_info
info = get_format_info(ext)
if info:
format_details[ext] = {
"format_name": info["format_name"],
"category": info["category"],
"mime_types": info["mime_types"]
}
return {
"supported_extensions": extensions,
"format_details": format_details,
"categories": {
"word": [ext for ext, info in format_details.items() if info["category"] == "word"],
"excel": [ext for ext, info in format_details.items() if info["category"] == "excel"],
"powerpoint": [ext for ext, info in format_details.items() if info["category"] == "powerpoint"]
},
"total_formats": len(extensions)
}
# Helper functions for text extraction
async def _extract_word_text(file_path: str, extension: str, preserve_formatting: bool, method: str) -> dict[str, Any]:
"""Extract text from Word documents with fallback methods."""
methods_tried = []
# Method selection
if method == "auto":
if extension == ".docx":
method_order = ["python-docx", "mammoth", "docx2txt"]
else: # .doc
method_order = ["olefile", "mammoth", "docx2txt"]
elif method == "primary":
method_order = ["python-docx"] if extension == ".docx" else ["olefile"]
else: # fallback
method_order = ["mammoth", "docx2txt"]
text = ""
formatted_sections = []
method_used = None
for method_name in method_order:
try:
methods_tried.append(method_name)
if method_name == "python-docx" and extension == ".docx":
import docx
doc = docx.Document(file_path)
paragraphs = []
for para in doc.paragraphs:
paragraphs.append(para.text)
if preserve_formatting:
formatted_sections.append({
"type": "paragraph",
"text": para.text,
"style": para.style.name if para.style else None
})
text = "\n".join(paragraphs)
method_used = "python-docx"
break
elif method_name == "mammoth":
import mammoth
with open(file_path, "rb") as docx_file:
if preserve_formatting:
result = mammoth.convert_to_html(docx_file)
text = result.value
formatted_sections.append({
"type": "html",
"content": result.value
})
else:
result = mammoth.extract_raw_text(docx_file)
text = result.value
method_used = "mammoth"
break
elif method_name == "docx2txt":
import docx2txt
text = docx2txt.process(file_path)
method_used = "docx2txt"
break
elif method_name == "olefile" and extension == ".doc":
# Basic text extraction for legacy .doc files
try:
import olefile
if olefile.isOleFile(file_path):
# This is a simplified approach - real .doc parsing is complex
with open(file_path, 'rb') as f:
content = f.read()
# Very basic text extraction attempt
text = content.decode('utf-8', errors='ignore')
# Clean up binary artifacts
import re
text = re.sub(r'[^\x20-\x7E\n\r\t]', '', text)
text = '\n'.join(line.strip() for line in text.split('\n') if line.strip())
method_used = "olefile"
break
except Exception:
continue
except ImportError:
continue
except Exception:
continue
if not method_used:
raise OfficeFileError(f"Failed to extract text using methods: {', '.join(methods_tried)}")
return {
"text": text,
"method_used": method_used,
"methods_tried": methods_tried,
"formatted_sections": formatted_sections
}
async def _extract_excel_text(file_path: str, extension: str, preserve_formatting: bool, method: str) -> dict[str, Any]:
"""Extract text from Excel documents."""
methods_tried = []
if extension == ".csv":
# CSV handling
import pandas as pd
try:
df = pd.read_csv(file_path)
text = df.to_string()
return {
"text": text,
"method_used": "pandas",
"methods_tried": ["pandas"],
"formatted_sections": [{"type": "table", "data": df.to_dict()}] if preserve_formatting else []
}
except Exception as e:
raise OfficeFileError(f"CSV processing failed: {str(e)}")
# Excel file handling
text = ""
formatted_sections = []
method_used = None
method_order = ["openpyxl", "pandas", "xlrd"] if extension == ".xlsx" else ["xlrd", "pandas", "openpyxl"]
for method_name in method_order:
try:
methods_tried.append(method_name)
if method_name == "openpyxl" and extension in [".xlsx", ".xlsm"]:
import openpyxl
wb = openpyxl.load_workbook(file_path, data_only=True)
text_parts = []
for sheet_name in wb.sheetnames:
ws = wb[sheet_name]
text_parts.append(f"Sheet: {sheet_name}")
for row in ws.iter_rows(values_only=True):
row_text = "\t".join(str(cell) if cell is not None else "" for cell in row)
if row_text.strip():
text_parts.append(row_text)
if preserve_formatting:
formatted_sections.append({
"type": "worksheet",
"name": sheet_name,
"data": [[str(cell.value) if cell.value is not None else "" for cell in row] for row in ws.iter_rows()]
})
text = "\n".join(text_parts)
method_used = "openpyxl"
break
elif method_name == "pandas":
import pandas as pd
if extension in [".xlsx", ".xlsm"]:
dfs = pd.read_excel(file_path, sheet_name=None)
else: # .xls
dfs = pd.read_excel(file_path, sheet_name=None, engine='xlrd')
text_parts = []
for sheet_name, df in dfs.items():
text_parts.append(f"Sheet: {sheet_name}")
text_parts.append(df.to_string())
if preserve_formatting:
formatted_sections.append({
"type": "dataframe",
"name": sheet_name,
"data": df.to_dict()
})
text = "\n\n".join(text_parts)
method_used = "pandas"
break
elif method_name == "xlrd" and extension == ".xls":
import xlrd
wb = xlrd.open_workbook(file_path)
text_parts = []
for sheet in wb.sheets():
text_parts.append(f"Sheet: {sheet.name}")
for row_idx in range(sheet.nrows):
row = sheet.row_values(row_idx)
row_text = "\t".join(str(cell) for cell in row)
text_parts.append(row_text)
text = "\n".join(text_parts)
method_used = "xlrd"
break
except ImportError:
continue
except Exception:
continue
if not method_used:
raise OfficeFileError(f"Failed to extract text using methods: {', '.join(methods_tried)}")
return {
"text": text,
"method_used": method_used,
"methods_tried": methods_tried,
"formatted_sections": formatted_sections
}
async def _extract_powerpoint_text(file_path: str, extension: str, preserve_formatting: bool, method: str) -> dict[str, Any]:
"""Extract text from PowerPoint documents."""
methods_tried = []
if extension == ".pptx":
try:
import pptx
prs = pptx.Presentation(file_path)
text_parts = []
formatted_sections = []
for slide_num, slide in enumerate(prs.slides, 1):
slide_text_parts = []
for shape in slide.shapes:
if hasattr(shape, "text") and shape.text:
slide_text_parts.append(shape.text)
slide_text = "\n".join(slide_text_parts)
text_parts.append(f"Slide {slide_num}:\n{slide_text}")
if preserve_formatting:
formatted_sections.append({
"type": "slide",
"number": slide_num,
"text": slide_text,
"shapes": len(slide.shapes)
})
text = "\n\n".join(text_parts)
return {
"text": text,
"method_used": "python-pptx",
"methods_tried": ["python-pptx"],
"formatted_sections": formatted_sections
}
except ImportError:
methods_tried.append("python-pptx")
except Exception:
methods_tried.append("python-pptx")
# Legacy .ppt handling would require additional libraries
if extension == ".ppt":
raise OfficeFileError("Legacy PowerPoint (.ppt) text extraction requires additional setup")
raise OfficeFileError(f"Failed to extract text using methods: {', '.join(methods_tried)}")
# Helper functions for image extraction
async def _extract_word_images(file_path: str, extension: str, output_format: str, min_width: int, min_height: int) -> list[dict[str, Any]]:
"""Extract images from Word documents."""
images = []
if extension == ".docx":
try:
import io
import zipfile
from PIL import Image
with zipfile.ZipFile(file_path, 'r') as zip_file:
# Look for images in media folder
image_files = [f for f in zip_file.namelist() if f.startswith('word/media/')]
for i, img_path in enumerate(image_files):
try:
img_data = zip_file.read(img_path)
img = Image.open(io.BytesIO(img_data))
# Size filtering
if img.width >= min_width and img.height >= min_height:
# Save to temp file
temp_path = os.path.join(TEMP_DIR, f"word_image_{i}.{output_format}")
img.save(temp_path, format=output_format.upper())
images.append({
"index": i,
"filename": os.path.basename(img_path),
"path": temp_path,
"width": img.width,
"height": img.height,
"format": img.format,
"size_bytes": len(img_data)
})
except Exception:
continue
except Exception as e:
raise OfficeFileError(f"Word image extraction failed: {str(e)}")
return images
async def _extract_excel_images(file_path: str, extension: str, output_format: str, min_width: int, min_height: int) -> list[dict[str, Any]]:
"""Extract images from Excel documents."""
images = []
if extension in [".xlsx", ".xlsm"]:
try:
import io
import zipfile
from PIL import Image
with zipfile.ZipFile(file_path, 'r') as zip_file:
# Look for images in media folder
image_files = [f for f in zip_file.namelist() if f.startswith('xl/media/')]
for i, img_path in enumerate(image_files):
try:
img_data = zip_file.read(img_path)
img = Image.open(io.BytesIO(img_data))
# Size filtering
if img.width >= min_width and img.height >= min_height:
# Save to temp file
temp_path = os.path.join(TEMP_DIR, f"excel_image_{i}.{output_format}")
img.save(temp_path, format=output_format.upper())
images.append({
"index": i,
"filename": os.path.basename(img_path),
"path": temp_path,
"width": img.width,
"height": img.height,
"format": img.format,
"size_bytes": len(img_data)
})
except Exception:
continue
except Exception as e:
raise OfficeFileError(f"Excel image extraction failed: {str(e)}")
return images
async def _extract_powerpoint_images(file_path: str, extension: str, output_format: str, min_width: int, min_height: int) -> list[dict[str, Any]]:
"""Extract images from PowerPoint documents."""
images = []
if extension == ".pptx":
try:
import io
import zipfile
from PIL import Image
with zipfile.ZipFile(file_path, 'r') as zip_file:
# Look for images in media folder
image_files = [f for f in zip_file.namelist() if f.startswith('ppt/media/')]
for i, img_path in enumerate(image_files):
try:
img_data = zip_file.read(img_path)
img = Image.open(io.BytesIO(img_data))
# Size filtering
if img.width >= min_width and img.height >= min_height:
# Save to temp file
temp_path = os.path.join(TEMP_DIR, f"powerpoint_image_{i}.{output_format}")
img.save(temp_path, format=output_format.upper())
images.append({
"index": i,
"filename": os.path.basename(img_path),
"path": temp_path,
"width": img.width,
"height": img.height,
"format": img.format,
"size_bytes": len(img_data)
})
except Exception:
continue
except Exception as e:
raise OfficeFileError(f"PowerPoint image extraction failed: {str(e)}")
return images
# Helper functions for metadata extraction
async def _extract_basic_metadata(file_path: str, extension: str, category: str) -> dict[str, Any]:
"""Extract basic metadata from Office documents."""
metadata = {"category": category, "extension": extension}
try:
if extension in [".docx", ".xlsx", ".pptx"] and category in ["word", "excel", "powerpoint"]:
import zipfile
with zipfile.ZipFile(file_path, 'r') as zip_file:
# Core properties
if 'docProps/core.xml' in zip_file.namelist():
zip_file.read('docProps/core.xml').decode('utf-8')
metadata["has_core_properties"] = True
# App properties
if 'docProps/app.xml' in zip_file.namelist():
zip_file.read('docProps/app.xml').decode('utf-8')
metadata["has_app_properties"] = True
except Exception:
pass
return metadata
async def _extract_word_metadata(file_path: str, extension: str) -> dict[str, Any]:
"""Extract Word-specific metadata."""
metadata = {"type": "word", "extension": extension}
if extension == ".docx":
try:
import docx
doc = docx.Document(file_path)
core_props = doc.core_properties
metadata.update({
"title": core_props.title,
"author": core_props.author,
"subject": core_props.subject,
"keywords": core_props.keywords,
"comments": core_props.comments,
"created": str(core_props.created) if core_props.created else None,
"modified": str(core_props.modified) if core_props.modified else None
})
# Document structure
metadata.update({
"paragraph_count": len(doc.paragraphs),
"section_count": len(doc.sections),
"has_tables": len(doc.tables) > 0,
"table_count": len(doc.tables)
})
except Exception:
pass
return metadata
async def _extract_excel_metadata(file_path: str, extension: str) -> dict[str, Any]:
"""Extract Excel-specific metadata."""
metadata = {"type": "excel", "extension": extension}
if extension in [".xlsx", ".xlsm"]:
try:
import openpyxl
wb = openpyxl.load_workbook(file_path)
props = wb.properties
metadata.update({
"title": props.title,
"creator": props.creator,
"subject": props.subject,
"description": props.description,
"keywords": props.keywords,
"created": str(props.created) if props.created else None,
"modified": str(props.modified) if props.modified else None
})
# Workbook structure
metadata.update({
"worksheet_count": len(wb.worksheets),
"worksheet_names": wb.sheetnames,
"has_charts": any(len(ws._charts) > 0 for ws in wb.worksheets),
"has_images": any(len(ws._images) > 0 for ws in wb.worksheets)
})
except Exception:
pass
return metadata
async def _extract_powerpoint_metadata(file_path: str, extension: str) -> dict[str, Any]:
"""Extract PowerPoint-specific metadata."""
metadata = {"type": "powerpoint", "extension": extension}
if extension == ".pptx":
try:
import pptx
prs = pptx.Presentation(file_path)
core_props = prs.core_properties
metadata.update({
"title": core_props.title,
"author": core_props.author,
"subject": core_props.subject,
"keywords": core_props.keywords,
"comments": core_props.comments,
"created": str(core_props.created) if core_props.created else None,
"modified": str(core_props.modified) if core_props.modified else None
})
# Presentation structure
slide_layouts = set()
total_shapes = 0
for slide in prs.slides:
slide_layouts.add(slide.slide_layout.name)
total_shapes += len(slide.shapes)
metadata.update({
"slide_count": len(prs.slides),
"slide_layouts": list(slide_layouts),
"total_shapes": total_shapes,
"slide_width": prs.slide_width,
"slide_height": prs.slide_height
})
except Exception:
pass
return metadata
def _calculate_health_score(validation: dict[str, Any], format_info: dict[str, Any]) -> int:
"""Calculate document health score (1-10)."""
score = 10
# Deduct for validation errors
if not validation["is_valid"]:
score -= 5
if validation["errors"]:
score -= len(validation["errors"]) * 2
if validation["warnings"]:
score -= len(validation["warnings"])
# Deduct for problematic characteristics
if validation.get("password_protected"):
score -= 1
if format_info.get("is_legacy"):
score -= 1
structure = format_info.get("structure", {})
if structure.get("estimated_complexity") == "complex":
score -= 1
return max(1, min(10, score))
def _get_health_recommendations(validation: dict[str, Any], format_info: dict[str, Any]) -> list[str]:
"""Get health improvement recommendations."""
recommendations = []
if validation["errors"]:
recommendations.append("Fix validation errors before processing")
if validation.get("password_protected"):
recommendations.append("Remove password protection if possible")
if format_info.get("is_legacy"):
recommendations.append("Consider converting to modern format (.docx, .xlsx, .pptx)")
structure = format_info.get("structure", {})
if structure.get("estimated_complexity") == "complex":
recommendations.append("Complex document may require specialized processing")
if not recommendations:
recommendations.append("Document appears healthy and ready for processing")
return recommendations
# Markdown conversion helper functions
async def _convert_docx_to_markdown(
file_path: str,
include_images: bool,
image_mode: str,
max_image_size: int,
preserve_structure: bool,
page_numbers: list[int],
summary_only: bool,
output_dir: str
) -> dict[str, Any]:
"""Convert .docx file to markdown with comprehensive feature support."""
import base64
# If page_numbers is specified, we need to use python-docx for page-based extraction
# as mammoth processes the entire document
if page_numbers:
return await _convert_docx_with_python_docx(
file_path, include_images, image_mode, max_image_size,
preserve_structure, page_numbers, summary_only, output_dir
)
try:
# Try mammoth first for better HTML->Markdown conversion (full document only)
import mammoth
# Configure mammoth for markdown-friendly output
with open(file_path, "rb") as docx_file:
if include_images:
# Extract images and handle them based on mode
images_info = []
def convert_image(image):
image_data = image.open()
content_type = image.content_type
ext = content_type.split('/')[-1] if '/' in content_type else 'png'
if image_mode == "base64":
if len(image_data) <= max_image_size:
encoded = base64.b64encode(image_data).decode('utf-8')
images_info.append({
"filename": f"image_{len(images_info)}.{ext}",
"content_type": content_type,
"size_bytes": len(image_data),
"mode": "base64"
})
return {
"src": f"data:{content_type};base64,{encoded}"
}
else:
# Too large for base64, fall back to reference
filename = f"large_image_{len(images_info)}.{ext}"
images_info.append({
"filename": filename,
"content_type": content_type,
"size_bytes": len(image_data),
"mode": "reference",
"note": "Too large for base64 encoding"
})
return {"src": filename}
elif image_mode == "files":
# Save image to file
nonlocal output_dir
if not output_dir:
output_dir = os.path.join(TEMP_DIR, "markdown_images")
os.makedirs(output_dir, exist_ok=True)
filename = f"image_{len(images_info)}.{ext}"
file_path = os.path.join(output_dir, filename)
with open(file_path, 'wb') as img_file:
img_file.write(image_data)
images_info.append({
"filename": filename,
"file_path": file_path,
"content_type": content_type,
"size_bytes": len(image_data),
"mode": "file"
})
return {"src": file_path}
else: # references
filename = f"image_{len(images_info)}.{ext}"
images_info.append({
"filename": filename,
"content_type": content_type,
"size_bytes": len(image_data),
"mode": "reference"
})
return {"src": filename}
# Convert with image handling
result = mammoth.convert_to_html(
docx_file,
convert_image=mammoth.images.img_element(convert_image)
)
html_content = result.value
markdown_content = _html_to_markdown(html_content, preserve_structure)
conversion_result = {
"content": markdown_content,
"method_used": "mammoth-with-images",
"images": images_info
}
else:
# Convert without images
result = mammoth.convert_to_markdown(docx_file)
markdown_content = result.value
conversion_result = {
"content": markdown_content,
"method_used": "mammoth-markdown",
"images": []
}
# Handle summary mode
if summary_only and len(markdown_content) > 5000:
# For summary mode, truncate large content
markdown_content = markdown_content[:5000] + "\n\n[Content truncated - use summary_only=false for full content]"
# Update the conversion result
conversion_result["content"] = markdown_content
# Extract structure information
if preserve_structure:
structure = _extract_markdown_structure(markdown_content)
conversion_result["structure"] = structure
return conversion_result
except ImportError:
# Fall back to python-docx with custom markdown conversion
return await _convert_docx_with_python_docx(
file_path, include_images, image_mode, max_image_size,
preserve_structure, page_numbers, summary_only, output_dir
)
except Exception:
# Fall back to python-docx
return await _convert_docx_with_python_docx(
file_path, include_images, image_mode, max_image_size,
preserve_structure, page_numbers, summary_only, output_dir
)
async def _convert_docx_with_python_docx(
file_path: str,
include_images: bool,
image_mode: str,
max_image_size: int,
preserve_structure: bool,
page_numbers: list[int],
summary_only: bool,
output_dir: str
) -> dict[str, Any]:
"""Convert .docx using python-docx with custom markdown conversion."""
import base64
import docx
from docx.oxml.table import CT_Tbl
from docx.oxml.text.paragraph import CT_P
from docx.table import Table
from docx.text.paragraph import Paragraph
doc = docx.Document(file_path)
markdown_parts = []
images_info = []
structure_info = {"headings": [], "tables": 0, "lists": 0, "paragraphs": 0}
# Extract images if requested
if include_images:
extracted_images = await _extract_word_images(file_path, ".docx", "png", 1, 1)
for i, img in enumerate(extracted_images):
if image_mode == "base64":
if img.get("size_bytes", 0) <= max_image_size:
with open(img["path"], "rb") as img_file:
img_data = img_file.read()
encoded = base64.b64encode(img_data).decode('utf-8')
images_info.append({
"filename": img["filename"],
"content_type": f"image/{img.get('format', 'png').lower()}",
"size_bytes": img.get("size_bytes", 0),
"mode": "base64",
"markdown_ref": f"![Image {i+1}](data:image/{img.get('format', 'png').lower()};base64,{encoded})"
})
else:
images_info.append({
"filename": img["filename"],
"size_bytes": img.get("size_bytes", 0),
"mode": "reference",
"markdown_ref": f"![Image {i+1}]({img['filename']})",
"note": "Too large for base64 encoding"
})
elif image_mode == "files":
images_info.append({
"filename": img["filename"],
"file_path": img["path"],
"size_bytes": img.get("size_bytes", 0),
"mode": "file",
"markdown_ref": f"![Image {i+1}]({img['path']})"
})
else: # references
images_info.append({
"filename": img["filename"],
"size_bytes": img.get("size_bytes", 0),
"mode": "reference",
"markdown_ref": f"![Image {i+1}]({img['filename']})"
})
# Process document elements with page filtering if specified
current_page = 1
include_current_page = not page_numbers or current_page in page_numbers
for element in doc.element.body:
if isinstance(element, CT_P):
paragraph = Paragraph(element, doc)
# Check for page breaks
if _has_page_break(paragraph):
current_page += 1
include_current_page = not page_numbers or current_page in page_numbers
continue
# Only process content from specified pages
if include_current_page:
markdown_text = _paragraph_to_markdown(paragraph, preserve_structure)
if markdown_text.strip():
markdown_parts.append(markdown_text)
structure_info["paragraphs"] += 1
# Track headings
if preserve_structure and markdown_text.startswith('#'):
level = len(markdown_text) - len(markdown_text.lstrip('#'))
heading_text = markdown_text.lstrip('# ').strip()
structure_info["headings"].append({
"level": level,
"text": heading_text,
"position": len(markdown_parts) - 1
})
elif isinstance(element, CT_Tbl):
# Only process tables from specified pages
if include_current_page:
table = Table(element, doc)
table_markdown = _table_to_markdown(table)
if table_markdown.strip():
markdown_parts.append(table_markdown)
structure_info["tables"] += 1
# Add image references at the end if any
if include_images and images_info:
markdown_parts.append("\n## Images\n")
for img in images_info:
markdown_parts.append(img["markdown_ref"])
markdown_content = "\n\n".join(markdown_parts)
result = {
"content": markdown_content,
"method_used": "python-docx-custom",
"images": images_info
}
# Add page filtering info
if page_numbers:
result["pages_processed"] = page_numbers
result["total_pages_in_range"] = len(page_numbers)
# Handle summary mode
if summary_only and len(markdown_content) > 5000:
markdown_content = markdown_content[:5000] + "\n\n[Content truncated - use summary_only=false for full content]"
# Update the result content
result["content"] = markdown_content
# Add structure info
if preserve_structure:
result["structure"] = structure_info
return result
async def _convert_doc_to_markdown(
file_path: str,
include_images: bool,
image_mode: str,
max_image_size: int,
preserve_structure: bool,
page_numbers: list[int],
summary_only: bool,
output_dir: str
) -> dict[str, Any]:
"""Convert legacy .doc file to markdown using available methods."""
try:
import mammoth
with open(file_path, "rb") as doc_file:
result = mammoth.convert_to_markdown(doc_file)
markdown_content = result.value
conversion_result = {
"content": markdown_content,
"method_used": "mammoth-doc",
"images": [] # Legacy .doc image extraction is complex
}
# Handle summary mode
if summary_only and len(markdown_content) > 5000:
markdown_content = markdown_content[:5000] + "\n\n[Content truncated - use summary_only=false for full content]"
# Update the conversion result
conversion_result["content"] = markdown_content
if preserve_structure:
structure = _extract_markdown_structure(markdown_content)
conversion_result["structure"] = structure
return conversion_result
except ImportError:
raise OfficeFileError("Legacy .doc conversion requires mammoth library")
except Exception as e:
raise OfficeFileError(f"Legacy .doc conversion failed: {str(e)}")
def _paragraph_to_markdown(paragraph, preserve_structure: bool) -> str:
"""Convert a Word paragraph to markdown format."""
text = paragraph.text.strip()
if not text:
return ""
if not preserve_structure:
return text
# Handle different paragraph styles
style_name = paragraph.style.name.lower() if paragraph.style else ""
if "heading" in style_name:
# Extract heading level from style name
import re
level_match = re.search(r'(\d+)', style_name)
level = int(level_match.group(1)) if level_match else 1
return f"{'#' * level} {text}"
elif "title" in style_name:
return f"# {text}"
elif "subtitle" in style_name:
return f"## {text}"
elif style_name in ["list paragraph", "list"]:
return f"- {text}"
elif "quote" in style_name:
return f"> {text}"
else:
return text
def _table_to_markdown(table) -> str:
"""Convert a Word table to markdown format."""
markdown_rows = []
for i, row in enumerate(table.rows):
cells = [cell.text.strip().replace('\n', ' ') for cell in row.cells]
markdown_row = "| " + " | ".join(cells) + " |"
markdown_rows.append(markdown_row)
# Add header separator after first row
if i == 0:
separator = "| " + " | ".join(["---"] * len(cells)) + " |"
markdown_rows.append(separator)
return "\n".join(markdown_rows)
def _html_to_markdown(html_content: str, preserve_structure: bool) -> str:
"""Convert HTML content to markdown format."""
import re
# Basic HTML to Markdown conversions
conversions = [
(r'<h1[^>]*>(.*?)</h1>', r'# \1'),
(r'<h2[^>]*>(.*?)</h2>', r'## \1'),
(r'<h3[^>]*>(.*?)</h3>', r'### \1'),
(r'<h4[^>]*>(.*?)</h4>', r'#### \1'),
(r'<h5[^>]*>(.*?)</h5>', r'##### \1'),
(r'<h6[^>]*>(.*?)</h6>', r'###### \1'),
(r'<strong[^>]*>(.*?)</strong>', r'**\1**'),
(r'<b[^>]*>(.*?)</b>', r'**\1**'),
(r'<em[^>]*>(.*?)</em>', r'*\1*'),
(r'<i[^>]*>(.*?)</i>', r'*\1*'),
(r'<code[^>]*>(.*?)</code>', r'`\1`'),
(r'<a[^>]*href="([^"]*)"[^>]*>(.*?)</a>', r'[\2](\1)'),
(r'<img[^>]*src="([^"]*)"[^>]*/?>', r'![](\1)'),
(r'<p[^>]*>(.*?)</p>', r'\1\n'),
(r'<br[^>]*/?>', r'\n'),
(r'<li[^>]*>(.*?)</li>', r'- \1'),
(r'<ul[^>]*>(.*?)</ul>', r'\1'),
(r'<ol[^>]*>(.*?)</ol>', r'\1'),
(r'<blockquote[^>]*>(.*?)</blockquote>', r'> \1'),
]
markdown = html_content
for pattern, replacement in conversions:
markdown = re.sub(pattern, replacement, markdown, flags=re.DOTALL | re.IGNORECASE)
# Clean up extra whitespace
markdown = re.sub(r'\n\s*\n\s*\n', '\n\n', markdown)
markdown = re.sub(r'^\s+|\s+$', '', markdown, flags=re.MULTILINE)
return markdown
def _chunk_markdown(content: str, chunk_size: int) -> list[dict[str, Any]]:
"""Split markdown content into chunks while preserving structure."""
chunks = []
lines = content.split('\n')
current_chunk = []
current_size = 0
chunk_num = 1
for line in lines:
line_size = len(line) + 1 # +1 for newline
# If adding this line would exceed chunk size and we have content
if current_size + line_size > chunk_size and current_chunk:
chunks.append({
"chunk_number": chunk_num,
"content": '\n'.join(current_chunk),
"character_count": current_size,
"line_count": len(current_chunk)
})
current_chunk = []
current_size = 0
chunk_num += 1
current_chunk.append(line)
current_size += line_size
# Add final chunk if there's remaining content
if current_chunk:
chunks.append({
"chunk_number": chunk_num,
"content": '\n'.join(current_chunk),
"character_count": current_size,
"line_count": len(current_chunk)
})
return chunks
def _extract_markdown_structure(content: str) -> dict[str, Any]:
"""Extract structure information from markdown content."""
import re
structure = {
"headings": [],
"lists": 0,
"links": 0,
"images": 0,
"code_blocks": 0,
"tables": 0,
"line_count": len(content.split('\n'))
}
lines = content.split('\n')
for i, line in enumerate(lines):
# Find headings
heading_match = re.match(r'^(#{1,6})\s+(.+)', line)
if heading_match:
level = len(heading_match.group(1))
text = heading_match.group(2).strip()
structure["headings"].append({
"level": level,
"text": text,
"line_number": i + 1
})
# Count other elements
if re.match(r'^[-*+]\s+', line):
structure["lists"] += 1
structure["links"] += len(re.findall(r'\[([^\]]+)\]\([^)]+\)', line))
structure["images"] += len(re.findall(r'!\[([^\]]*)\]\([^)]+\)', line))
if line.strip().startswith('```'):
structure["code_blocks"] += 1
if '|' in line and line.count('|') >= 2:
structure["tables"] += 1
return structure
def _has_page_break(paragraph) -> bool:
"""Check if a paragraph contains a page break."""
try:
# Check for explicit page breaks in paragraph runs
for run in paragraph.runs:
if run._r.find('.//{http://schemas.openxmlformats.org/wordprocessingml/2006/main}br') is not None:
br_elem = run._r.find('.//{http://schemas.openxmlformats.org/wordprocessingml/2006/main}br')
if br_elem is not None and br_elem.get('{http://schemas.openxmlformats.org/wordprocessingml/2006/main}type') == 'page':
return True
return False
except Exception:
return False
def _parse_page_range(page_range: str) -> list[int]:
"""Parse page range string into list of page numbers.
Examples:
"1-5" -> [1, 2, 3, 4, 5]
"1,3,5" -> [1, 3, 5]
"1-3,5,7-9" -> [1, 2, 3, 5, 7, 8, 9]
"""
pages = set()
for part in page_range.split(','):
part = part.strip()
if '-' in part:
# Handle range like "1-5"
start, end = part.split('-', 1)
try:
start_num = int(start.strip())
end_num = int(end.strip())
pages.update(range(start_num, end_num + 1))
except ValueError:
continue
else:
# Handle single page like "3"
try:
pages.add(int(part))
except ValueError:
continue
return sorted(list(pages))
def main():
"""Main entry point for the MCP server."""
import sys
if len(sys.argv) > 1 and sys.argv[1] == "--version":
from . import __version__
print(f"MCP Office Tools v{__version__}")
return
# Run the FastMCP server
app.run()
if __name__ == "__main__":
main()