Ryan Malloy 9d6a9fc24c Refactor server architecture using mcpmixin pattern
- Split monolithic 2209-line server.py into organized mixin classes
- UniversalMixin: Format-agnostic tools (extract_text, extract_images, etc.)
- WordMixin: Word-specific tools (convert_to_markdown with chapter_name support)
- ExcelMixin: Placeholder for future Excel-specific tools
- PowerPointMixin: Placeholder for future PowerPoint-specific tools

Benefits:
• Improved maintainability and separation of concerns
• Better testability with isolated mixins
• Easier team collaboration on different file types
• Reduced cognitive load per module
• Preserved all 7 existing tools with full functionality

Architecture now supports clean expansion for format-specific tools
while maintaining backward compatibility through legacy server backup.
2025-09-26 13:08:53 -06:00

2210 lines
87 KiB
Python

"""MCP Office Tools Server - Comprehensive Microsoft Office document processing.
FastMCP server providing 30+ tools for processing Word, Excel, PowerPoint documents
including both modern formats (.docx, .xlsx, .pptx) and legacy formats (.doc, .xls, .ppt).
"""
import os
import tempfile
import time
from pathlib import Path
from typing import Any
from fastmcp import FastMCP
from pydantic import Field
from .utils import (
OfficeFileError,
classify_document_type,
detect_format,
get_supported_extensions,
resolve_office_file_path,
validate_office_file,
)
# Initialize FastMCP app
app = FastMCP("MCP Office Tools")
# Configuration
TEMP_DIR = os.environ.get("OFFICE_TEMP_DIR", tempfile.gettempdir())
DEBUG = os.environ.get("DEBUG", "false").lower() == "true"
@app.tool()
async def extract_text(
file_path: str = Field(description="Path to Office document or URL"),
preserve_formatting: bool = Field(default=False, description="Preserve text formatting and structure"),
include_metadata: bool = Field(default=True, description="Include document metadata in output"),
method: str = Field(default="auto", description="Extraction method: auto, primary, fallback")
) -> dict[str, Any]:
"""Extract text content from Office documents with intelligent method selection.
Supports Word (.docx, .doc), Excel (.xlsx, .xls), PowerPoint (.pptx, .ppt),
and CSV files. Uses multi-library fallback for maximum compatibility.
"""
start_time = time.time()
try:
# Resolve file path (download if URL)
local_path = await resolve_office_file_path(file_path)
# Validate file
validation = await validate_office_file(local_path)
if not validation["is_valid"]:
raise OfficeFileError(f"Invalid file: {', '.join(validation['errors'])}")
# Get format info
format_info = await detect_format(local_path)
category = format_info["category"]
extension = format_info["extension"]
# Route to appropriate extraction method
if category == "word":
text_result = await _extract_word_text(local_path, extension, preserve_formatting, method)
elif category == "excel":
text_result = await _extract_excel_text(local_path, extension, preserve_formatting, method)
elif category == "powerpoint":
text_result = await _extract_powerpoint_text(local_path, extension, preserve_formatting, method)
else:
raise OfficeFileError(f"Unsupported document category: {category}")
# Compile results
result = {
"text": text_result["text"],
"method_used": text_result["method_used"],
"character_count": len(text_result["text"]),
"word_count": len(text_result["text"].split()) if text_result["text"] else 0,
"extraction_time": round(time.time() - start_time, 3),
"format_info": {
"format": format_info["format_name"],
"category": category,
"is_legacy": format_info["is_legacy"]
}
}
if include_metadata:
result["metadata"] = await _extract_basic_metadata(local_path, extension, category)
if preserve_formatting:
result["formatted_sections"] = text_result.get("formatted_sections", [])
return result
except Exception as e:
if DEBUG:
import traceback
traceback.print_exc()
raise OfficeFileError(f"Text extraction failed: {str(e)}")
@app.tool()
async def extract_images(
file_path: str = Field(description="Path to Office document or URL"),
output_format: str = Field(default="png", description="Output image format: png, jpg, jpeg"),
min_width: int = Field(default=100, description="Minimum image width in pixels"),
min_height: int = Field(default=100, description="Minimum image height in pixels"),
include_metadata: bool = Field(default=True, description="Include image metadata")
) -> dict[str, Any]:
"""Extract images from Office documents with size filtering and format conversion."""
start_time = time.time()
try:
# Resolve file path
local_path = await resolve_office_file_path(file_path)
# Validate file
validation = await validate_office_file(local_path)
if not validation["is_valid"]:
raise OfficeFileError(f"Invalid file: {', '.join(validation['errors'])}")
# Get format info
format_info = await detect_format(local_path)
category = format_info["category"]
extension = format_info["extension"]
# Extract images based on format
if category == "word":
images = await _extract_word_images(local_path, extension, output_format, min_width, min_height)
elif category == "excel":
images = await _extract_excel_images(local_path, extension, output_format, min_width, min_height)
elif category == "powerpoint":
images = await _extract_powerpoint_images(local_path, extension, output_format, min_width, min_height)
else:
raise OfficeFileError(f"Image extraction not supported for category: {category}")
result = {
"images": images,
"image_count": len(images),
"extraction_time": round(time.time() - start_time, 3),
"format_info": {
"format": format_info["format_name"],
"category": category
}
}
if include_metadata:
result["total_size_bytes"] = sum(img.get("size_bytes", 0) for img in images)
return result
except Exception as e:
if DEBUG:
import traceback
traceback.print_exc()
raise OfficeFileError(f"Image extraction failed: {str(e)}")
@app.tool()
async def extract_metadata(
file_path: str = Field(description="Path to Office document or URL")
) -> dict[str, Any]:
"""Extract comprehensive metadata from Office documents."""
start_time = time.time()
try:
# Resolve file path
local_path = await resolve_office_file_path(file_path)
# Validate file
validation = await validate_office_file(local_path)
if not validation["is_valid"]:
raise OfficeFileError(f"Invalid file: {', '.join(validation['errors'])}")
# Get format info
format_info = await detect_format(local_path)
category = format_info["category"]
extension = format_info["extension"]
# Extract metadata based on format
if category == "word":
metadata = await _extract_word_metadata(local_path, extension)
elif category == "excel":
metadata = await _extract_excel_metadata(local_path, extension)
elif category == "powerpoint":
metadata = await _extract_powerpoint_metadata(local_path, extension)
else:
metadata = {"category": category, "basic_info": "Limited metadata available"}
# Add file system metadata
path = Path(local_path)
stat = path.stat()
result = {
"document_metadata": metadata,
"file_metadata": {
"filename": path.name,
"file_size": stat.st_size,
"created": stat.st_ctime,
"modified": stat.st_mtime,
"extension": extension
},
"format_info": format_info,
"extraction_time": round(time.time() - start_time, 3)
}
return result
except Exception as e:
if DEBUG:
import traceback
traceback.print_exc()
raise OfficeFileError(f"Metadata extraction failed: {str(e)}")
@app.tool()
async def detect_office_format(
file_path: str = Field(description="Path to Office document or URL")
) -> dict[str, Any]:
"""Intelligent Office document format detection and analysis."""
start_time = time.time()
try:
# Resolve file path
local_path = await resolve_office_file_path(file_path)
# Detect format
format_info = await detect_format(local_path)
# Classify document
classification = await classify_document_type(local_path)
result = {
"format_detection": format_info,
"document_classification": classification,
"supported": format_info["is_supported"],
"processing_recommendations": format_info.get("processing_hints", []),
"detection_time": round(time.time() - start_time, 3)
}
return result
except Exception as e:
if DEBUG:
import traceback
traceback.print_exc()
raise OfficeFileError(f"Format detection failed: {str(e)}")
@app.tool()
async def analyze_document_health(
file_path: str = Field(description="Path to Office document or URL")
) -> dict[str, Any]:
"""Comprehensive document health and integrity analysis."""
start_time = time.time()
try:
# Resolve file path
local_path = await resolve_office_file_path(file_path)
# Validate file thoroughly
validation = await validate_office_file(local_path)
# Get format info
format_info = await detect_format(local_path)
# Health assessment
health_score = _calculate_health_score(validation, format_info)
result = {
"overall_health": "healthy" if validation["is_valid"] and health_score >= 8 else
"warning" if health_score >= 5 else "problematic",
"health_score": health_score,
"validation_results": validation,
"format_analysis": format_info,
"recommendations": _get_health_recommendations(validation, format_info),
"analysis_time": round(time.time() - start_time, 3)
}
return result
except Exception as e:
if DEBUG:
import traceback
traceback.print_exc()
raise OfficeFileError(f"Health analysis failed: {str(e)}")
@app.tool()
async def convert_to_markdown(
file_path: str = Field(description="Path to Office document or URL"),
include_images: bool = Field(default=True, description="Include images in markdown with base64 encoding or file references"),
image_mode: str = Field(default="base64", description="Image handling mode: 'base64', 'files', or 'references'"),
max_image_size: int = Field(default=1024*1024, description="Maximum image size in bytes for base64 encoding"),
preserve_structure: bool = Field(default=True, description="Preserve document structure (headings, lists, tables)"),
page_range: str = Field(default="", description="Page range to convert (e.g., '1-5', '3', '1,3,5-10'). RECOMMENDED for large documents. Empty = all pages"),
bookmark_name: str = Field(default="", description="Extract content for a specific bookmark/chapter (e.g., 'Chapter1_Start'). More reliable than page ranges."),
chapter_name: str = Field(default="", description="Extract content for a chapter by heading text (e.g., 'Chapter 1', 'Introduction'). Works when bookmarks aren't available."),
summary_only: bool = Field(default=False, description="Return only metadata and truncated summary. STRONGLY RECOMMENDED for large docs (>10 pages)"),
output_dir: str = Field(default="", description="Output directory for image files (if image_mode='files')")
) -> dict[str, Any]:
"""Convert Office documents to Markdown format with intelligent processing recommendations.
⚠️ RECOMMENDED WORKFLOW FOR LARGE DOCUMENTS (>5 pages):
1. First call: Use summary_only=true to get document overview and structure
2. Then: Use page_range (e.g., "1-10", "15-25") to process specific sections
This prevents response size errors and provides efficient processing.
Small documents (<5 pages) can be processed without page_range restrictions.
"""
start_time = time.time()
try:
# Resolve file path
local_path = await resolve_office_file_path(file_path)
# Validate file
validation = await validate_office_file(local_path)
if not validation["is_valid"]:
raise OfficeFileError(f"Invalid file: {', '.join(validation['errors'])}")
# Get format info
format_info = await detect_format(local_path)
category = format_info["category"]
extension = format_info["extension"]
# Currently focused on Word documents for markdown conversion
if category != "word":
raise OfficeFileError(f"Markdown conversion currently only supports Word documents, got: {category}")
# Analyze document size and provide intelligent recommendations
doc_analysis = await _analyze_document_size(local_path, extension)
processing_recommendation = _get_processing_recommendation(
doc_analysis, page_range, summary_only
)
# Parse page range if provided
page_numbers = _parse_page_range(page_range) if page_range else None
# Prioritize bookmark/chapter extraction over page ranges
if bookmark_name or chapter_name:
page_numbers = None # Ignore page ranges when bookmark or chapter is specified
# Convert to markdown based on format
if extension == ".docx":
markdown_result = await _convert_docx_to_markdown(
local_path, include_images, image_mode, max_image_size,
preserve_structure, page_numbers, summary_only, output_dir, bookmark_name, chapter_name
)
else: # .doc
# For legacy .doc files, use mammoth if available
markdown_result = await _convert_doc_to_markdown(
local_path, include_images, image_mode, max_image_size,
preserve_structure, page_numbers, summary_only, output_dir
)
# Build result based on mode
result = {
"metadata": {
"original_file": os.path.basename(local_path),
"format": format_info["format_name"],
"conversion_method": markdown_result["method_used"],
"conversion_time": round(time.time() - start_time, 3),
"summary_only": summary_only,
"document_analysis": doc_analysis,
"processing_recommendation": processing_recommendation
}
}
# Add page range info if used
if page_range:
result["metadata"]["page_range"] = page_range
result["metadata"]["pages_processed"] = len(page_numbers) if page_numbers else 0
# Add content based on mode
if summary_only:
# VERY restrictive summary mode to prevent massive responses
result["metadata"]["character_count"] = len(markdown_result["content"])
result["metadata"]["word_count"] = len(markdown_result["content"].split())
# Ultra-short summary (only 500 chars max)
result["summary"] = markdown_result["content"][:500] + "..." if len(markdown_result["content"]) > 500 else markdown_result["content"]
# Severely limit table of contents to prevent 1M+ token responses
if "table_of_contents" in markdown_result:
toc = markdown_result["table_of_contents"]
if "sections" in toc and len(toc["sections"]) > 20:
# Limit to first 20 sections only
limited_toc = {
"sections": toc["sections"][:20],
"total_sections": len(toc["sections"]),
"showing_first": 20,
"note": f"Showing first 20 of {len(toc['sections'])} sections. Use page_range to extract specific sections.",
"suggested_chunking": toc.get("suggested_chunking", [])[:10] # Limit chunking suggestions too
}
result["table_of_contents"] = limited_toc
else:
result["table_of_contents"] = toc
else:
# Include content with automatic size limiting to prevent MCP errors
content = markdown_result["content"]
# Apply aggressive content limiting to stay under 25k token limit
# Rough estimate: ~4 chars per token, leave buffer for metadata
max_content_chars = 80000 # ~20k tokens worth of content
if len(content) > max_content_chars:
# Truncate but try to preserve structure
truncated_content = _smart_truncate_content(content, max_content_chars)
result["markdown"] = truncated_content
result["content_truncated"] = True
result["original_length"] = len(content)
result["truncated_length"] = len(truncated_content)
result["truncation_note"] = f"Content truncated to stay under MCP 25k token limit. Original: {len(content):,} chars, Shown: {len(truncated_content):,} chars. Use smaller page ranges for full content."
else:
result["markdown"] = content
result["content_truncated"] = False
result["metadata"]["character_count"] = len(content)
result["metadata"]["word_count"] = len(content.split())
# Add image info
if include_images and markdown_result.get("images"):
result["images"] = markdown_result["images"]
result["metadata"]["image_count"] = len(markdown_result["images"])
result["metadata"]["total_image_size"] = sum(
img.get("size_bytes", 0) for img in markdown_result["images"]
)
# Add structure info
if preserve_structure and markdown_result.get("structure"):
result["structure"] = markdown_result["structure"]
return result
except Exception as e:
if DEBUG:
import traceback
traceback.print_exc()
raise OfficeFileError(f"Markdown conversion failed: {str(e)}")
@app.tool()
async def get_supported_formats() -> dict[str, Any]:
"""Get list of all supported Office document formats and their capabilities."""
extensions = get_supported_extensions()
format_details = {}
for ext in extensions:
from .utils.validation import get_format_info
info = get_format_info(ext)
if info:
format_details[ext] = {
"format_name": info["format_name"],
"category": info["category"],
"mime_types": info["mime_types"]
}
return {
"supported_extensions": extensions,
"format_details": format_details,
"categories": {
"word": [ext for ext, info in format_details.items() if info["category"] == "word"],
"excel": [ext for ext, info in format_details.items() if info["category"] == "excel"],
"powerpoint": [ext for ext, info in format_details.items() if info["category"] == "powerpoint"]
},
"total_formats": len(extensions)
}
# Helper functions for text extraction
async def _extract_word_text(file_path: str, extension: str, preserve_formatting: bool, method: str) -> dict[str, Any]:
"""Extract text from Word documents with fallback methods."""
methods_tried = []
# Method selection
if method == "auto":
if extension == ".docx":
method_order = ["python-docx", "mammoth", "docx2txt"]
else: # .doc
method_order = ["olefile", "mammoth", "docx2txt"]
elif method == "primary":
method_order = ["python-docx"] if extension == ".docx" else ["olefile"]
else: # fallback
method_order = ["mammoth", "docx2txt"]
text = ""
formatted_sections = []
method_used = None
for method_name in method_order:
try:
methods_tried.append(method_name)
if method_name == "python-docx" and extension == ".docx":
import docx
doc = docx.Document(file_path)
paragraphs = []
for para in doc.paragraphs:
paragraphs.append(para.text)
if preserve_formatting:
formatted_sections.append({
"type": "paragraph",
"text": para.text,
"style": para.style.name if para.style else None
})
text = "\n".join(paragraphs)
method_used = "python-docx"
break
elif method_name == "mammoth":
import mammoth
with open(file_path, "rb") as docx_file:
if preserve_formatting:
result = mammoth.convert_to_html(docx_file)
text = result.value
formatted_sections.append({
"type": "html",
"content": result.value
})
else:
result = mammoth.extract_raw_text(docx_file)
text = result.value
method_used = "mammoth"
break
elif method_name == "docx2txt":
import docx2txt
text = docx2txt.process(file_path)
method_used = "docx2txt"
break
elif method_name == "olefile" and extension == ".doc":
# Basic text extraction for legacy .doc files
try:
import olefile
if olefile.isOleFile(file_path):
# This is a simplified approach - real .doc parsing is complex
with open(file_path, 'rb') as f:
content = f.read()
# Very basic text extraction attempt
text = content.decode('utf-8', errors='ignore')
# Clean up binary artifacts
import re
text = re.sub(r'[^\x20-\x7E\n\r\t]', '', text)
text = '\n'.join(line.strip() for line in text.split('\n') if line.strip())
method_used = "olefile"
break
except Exception:
continue
except ImportError:
continue
except Exception:
continue
if not method_used:
raise OfficeFileError(f"Failed to extract text using methods: {', '.join(methods_tried)}")
return {
"text": text,
"method_used": method_used,
"methods_tried": methods_tried,
"formatted_sections": formatted_sections
}
async def _extract_excel_text(file_path: str, extension: str, preserve_formatting: bool, method: str) -> dict[str, Any]:
"""Extract text from Excel documents."""
methods_tried = []
if extension == ".csv":
# CSV handling
import pandas as pd
try:
df = pd.read_csv(file_path)
text = df.to_string()
return {
"text": text,
"method_used": "pandas",
"methods_tried": ["pandas"],
"formatted_sections": [{"type": "table", "data": df.to_dict()}] if preserve_formatting else []
}
except Exception as e:
raise OfficeFileError(f"CSV processing failed: {str(e)}")
# Excel file handling
text = ""
formatted_sections = []
method_used = None
method_order = ["openpyxl", "pandas", "xlrd"] if extension == ".xlsx" else ["xlrd", "pandas", "openpyxl"]
for method_name in method_order:
try:
methods_tried.append(method_name)
if method_name == "openpyxl" and extension in [".xlsx", ".xlsm"]:
import openpyxl
wb = openpyxl.load_workbook(file_path, data_only=True)
text_parts = []
for sheet_name in wb.sheetnames:
ws = wb[sheet_name]
text_parts.append(f"Sheet: {sheet_name}")
for row in ws.iter_rows(values_only=True):
row_text = "\t".join(str(cell) if cell is not None else "" for cell in row)
if row_text.strip():
text_parts.append(row_text)
if preserve_formatting:
formatted_sections.append({
"type": "worksheet",
"name": sheet_name,
"data": [[str(cell.value) if cell.value is not None else "" for cell in row] for row in ws.iter_rows()]
})
text = "\n".join(text_parts)
method_used = "openpyxl"
break
elif method_name == "pandas":
import pandas as pd
if extension in [".xlsx", ".xlsm"]:
dfs = pd.read_excel(file_path, sheet_name=None)
else: # .xls
dfs = pd.read_excel(file_path, sheet_name=None, engine='xlrd')
text_parts = []
for sheet_name, df in dfs.items():
text_parts.append(f"Sheet: {sheet_name}")
text_parts.append(df.to_string())
if preserve_formatting:
formatted_sections.append({
"type": "dataframe",
"name": sheet_name,
"data": df.to_dict()
})
text = "\n\n".join(text_parts)
method_used = "pandas"
break
elif method_name == "xlrd" and extension == ".xls":
import xlrd
wb = xlrd.open_workbook(file_path)
text_parts = []
for sheet in wb.sheets():
text_parts.append(f"Sheet: {sheet.name}")
for row_idx in range(sheet.nrows):
row = sheet.row_values(row_idx)
row_text = "\t".join(str(cell) for cell in row)
text_parts.append(row_text)
text = "\n".join(text_parts)
method_used = "xlrd"
break
except ImportError:
continue
except Exception:
continue
if not method_used:
raise OfficeFileError(f"Failed to extract text using methods: {', '.join(methods_tried)}")
return {
"text": text,
"method_used": method_used,
"methods_tried": methods_tried,
"formatted_sections": formatted_sections
}
async def _extract_powerpoint_text(file_path: str, extension: str, preserve_formatting: bool, method: str) -> dict[str, Any]:
"""Extract text from PowerPoint documents."""
methods_tried = []
if extension == ".pptx":
try:
import pptx
prs = pptx.Presentation(file_path)
text_parts = []
formatted_sections = []
for slide_num, slide in enumerate(prs.slides, 1):
slide_text_parts = []
for shape in slide.shapes:
if hasattr(shape, "text") and shape.text:
slide_text_parts.append(shape.text)
slide_text = "\n".join(slide_text_parts)
text_parts.append(f"Slide {slide_num}:\n{slide_text}")
if preserve_formatting:
formatted_sections.append({
"type": "slide",
"number": slide_num,
"text": slide_text,
"shapes": len(slide.shapes)
})
text = "\n\n".join(text_parts)
return {
"text": text,
"method_used": "python-pptx",
"methods_tried": ["python-pptx"],
"formatted_sections": formatted_sections
}
except ImportError:
methods_tried.append("python-pptx")
except Exception:
methods_tried.append("python-pptx")
# Legacy .ppt handling would require additional libraries
if extension == ".ppt":
raise OfficeFileError("Legacy PowerPoint (.ppt) text extraction requires additional setup")
raise OfficeFileError(f"Failed to extract text using methods: {', '.join(methods_tried)}")
# Helper functions for image extraction
async def _extract_word_images(file_path: str, extension: str, output_format: str, min_width: int, min_height: int) -> list[dict[str, Any]]:
"""Extract images from Word documents."""
images = []
if extension == ".docx":
try:
import io
import zipfile
from PIL import Image
with zipfile.ZipFile(file_path, 'r') as zip_file:
# Look for images in media folder
image_files = [f for f in zip_file.namelist() if f.startswith('word/media/')]
for i, img_path in enumerate(image_files):
try:
img_data = zip_file.read(img_path)
img = Image.open(io.BytesIO(img_data))
# Size filtering
if img.width >= min_width and img.height >= min_height:
# Save to temp file
temp_path = os.path.join(TEMP_DIR, f"word_image_{i}.{output_format}")
img.save(temp_path, format=output_format.upper())
images.append({
"index": i,
"filename": os.path.basename(img_path),
"path": temp_path,
"width": img.width,
"height": img.height,
"format": img.format,
"size_bytes": len(img_data)
})
except Exception:
continue
except Exception as e:
raise OfficeFileError(f"Word image extraction failed: {str(e)}")
return images
async def _extract_excel_images(file_path: str, extension: str, output_format: str, min_width: int, min_height: int) -> list[dict[str, Any]]:
"""Extract images from Excel documents."""
images = []
if extension in [".xlsx", ".xlsm"]:
try:
import io
import zipfile
from PIL import Image
with zipfile.ZipFile(file_path, 'r') as zip_file:
# Look for images in media folder
image_files = [f for f in zip_file.namelist() if f.startswith('xl/media/')]
for i, img_path in enumerate(image_files):
try:
img_data = zip_file.read(img_path)
img = Image.open(io.BytesIO(img_data))
# Size filtering
if img.width >= min_width and img.height >= min_height:
# Save to temp file
temp_path = os.path.join(TEMP_DIR, f"excel_image_{i}.{output_format}")
img.save(temp_path, format=output_format.upper())
images.append({
"index": i,
"filename": os.path.basename(img_path),
"path": temp_path,
"width": img.width,
"height": img.height,
"format": img.format,
"size_bytes": len(img_data)
})
except Exception:
continue
except Exception as e:
raise OfficeFileError(f"Excel image extraction failed: {str(e)}")
return images
async def _extract_powerpoint_images(file_path: str, extension: str, output_format: str, min_width: int, min_height: int) -> list[dict[str, Any]]:
"""Extract images from PowerPoint documents."""
images = []
if extension == ".pptx":
try:
import io
import zipfile
from PIL import Image
with zipfile.ZipFile(file_path, 'r') as zip_file:
# Look for images in media folder
image_files = [f for f in zip_file.namelist() if f.startswith('ppt/media/')]
for i, img_path in enumerate(image_files):
try:
img_data = zip_file.read(img_path)
img = Image.open(io.BytesIO(img_data))
# Size filtering
if img.width >= min_width and img.height >= min_height:
# Save to temp file
temp_path = os.path.join(TEMP_DIR, f"powerpoint_image_{i}.{output_format}")
img.save(temp_path, format=output_format.upper())
images.append({
"index": i,
"filename": os.path.basename(img_path),
"path": temp_path,
"width": img.width,
"height": img.height,
"format": img.format,
"size_bytes": len(img_data)
})
except Exception:
continue
except Exception as e:
raise OfficeFileError(f"PowerPoint image extraction failed: {str(e)}")
return images
# Helper functions for metadata extraction
async def _extract_basic_metadata(file_path: str, extension: str, category: str) -> dict[str, Any]:
"""Extract basic metadata from Office documents."""
metadata = {"category": category, "extension": extension}
try:
if extension in [".docx", ".xlsx", ".pptx"] and category in ["word", "excel", "powerpoint"]:
import zipfile
with zipfile.ZipFile(file_path, 'r') as zip_file:
# Core properties
if 'docProps/core.xml' in zip_file.namelist():
zip_file.read('docProps/core.xml').decode('utf-8')
metadata["has_core_properties"] = True
# App properties
if 'docProps/app.xml' in zip_file.namelist():
zip_file.read('docProps/app.xml').decode('utf-8')
metadata["has_app_properties"] = True
except Exception:
pass
return metadata
async def _extract_word_metadata(file_path: str, extension: str) -> dict[str, Any]:
"""Extract Word-specific metadata."""
metadata = {"type": "word", "extension": extension}
if extension == ".docx":
try:
import docx
doc = docx.Document(file_path)
core_props = doc.core_properties
metadata.update({
"title": core_props.title,
"author": core_props.author,
"subject": core_props.subject,
"keywords": core_props.keywords,
"comments": core_props.comments,
"created": str(core_props.created) if core_props.created else None,
"modified": str(core_props.modified) if core_props.modified else None
})
# Document structure
metadata.update({
"paragraph_count": len(doc.paragraphs),
"section_count": len(doc.sections),
"has_tables": len(doc.tables) > 0,
"table_count": len(doc.tables)
})
except Exception:
pass
return metadata
async def _extract_excel_metadata(file_path: str, extension: str) -> dict[str, Any]:
"""Extract Excel-specific metadata."""
metadata = {"type": "excel", "extension": extension}
if extension in [".xlsx", ".xlsm"]:
try:
import openpyxl
wb = openpyxl.load_workbook(file_path)
props = wb.properties
metadata.update({
"title": props.title,
"creator": props.creator,
"subject": props.subject,
"description": props.description,
"keywords": props.keywords,
"created": str(props.created) if props.created else None,
"modified": str(props.modified) if props.modified else None
})
# Workbook structure
metadata.update({
"worksheet_count": len(wb.worksheets),
"worksheet_names": wb.sheetnames,
"has_charts": any(len(ws._charts) > 0 for ws in wb.worksheets),
"has_images": any(len(ws._images) > 0 for ws in wb.worksheets)
})
except Exception:
pass
return metadata
async def _extract_powerpoint_metadata(file_path: str, extension: str) -> dict[str, Any]:
"""Extract PowerPoint-specific metadata."""
metadata = {"type": "powerpoint", "extension": extension}
if extension == ".pptx":
try:
import pptx
prs = pptx.Presentation(file_path)
core_props = prs.core_properties
metadata.update({
"title": core_props.title,
"author": core_props.author,
"subject": core_props.subject,
"keywords": core_props.keywords,
"comments": core_props.comments,
"created": str(core_props.created) if core_props.created else None,
"modified": str(core_props.modified) if core_props.modified else None
})
# Presentation structure
slide_layouts = set()
total_shapes = 0
for slide in prs.slides:
slide_layouts.add(slide.slide_layout.name)
total_shapes += len(slide.shapes)
metadata.update({
"slide_count": len(prs.slides),
"slide_layouts": list(slide_layouts),
"total_shapes": total_shapes,
"slide_width": prs.slide_width,
"slide_height": prs.slide_height
})
except Exception:
pass
return metadata
def _calculate_health_score(validation: dict[str, Any], format_info: dict[str, Any]) -> int:
"""Calculate document health score (1-10)."""
score = 10
# Deduct for validation errors
if not validation["is_valid"]:
score -= 5
if validation["errors"]:
score -= len(validation["errors"]) * 2
if validation["warnings"]:
score -= len(validation["warnings"])
# Deduct for problematic characteristics
if validation.get("password_protected"):
score -= 1
if format_info.get("is_legacy"):
score -= 1
structure = format_info.get("structure", {})
if structure.get("estimated_complexity") == "complex":
score -= 1
return max(1, min(10, score))
def _get_health_recommendations(validation: dict[str, Any], format_info: dict[str, Any]) -> list[str]:
"""Get health improvement recommendations."""
recommendations = []
if validation["errors"]:
recommendations.append("Fix validation errors before processing")
if validation.get("password_protected"):
recommendations.append("Remove password protection if possible")
if format_info.get("is_legacy"):
recommendations.append("Consider converting to modern format (.docx, .xlsx, .pptx)")
structure = format_info.get("structure", {})
if structure.get("estimated_complexity") == "complex":
recommendations.append("Complex document may require specialized processing")
if not recommendations:
recommendations.append("Document appears healthy and ready for processing")
return recommendations
# Markdown conversion helper functions
async def _convert_docx_to_markdown(
file_path: str,
include_images: bool,
image_mode: str,
max_image_size: int,
preserve_structure: bool,
page_numbers: list[int],
summary_only: bool,
output_dir: str,
bookmark_name: str = "",
chapter_name: str = ""
) -> dict[str, Any]:
"""Convert .docx file to markdown with comprehensive feature support."""
import base64
# ULTRA-FAST summary mode - skip all complex processing
if summary_only:
return await _get_ultra_fast_summary(file_path)
# If page_numbers, bookmark_name, or chapter_name is specified, we need to use python-docx for targeted extraction
# as mammoth processes the entire document
if page_numbers or bookmark_name or chapter_name:
return await _convert_docx_with_python_docx(
file_path, include_images, image_mode, max_image_size,
preserve_structure, page_numbers, summary_only, output_dir, bookmark_name, chapter_name
)
try:
# Try mammoth first for better HTML->Markdown conversion (full document only)
import mammoth
# Configure mammoth for markdown-friendly output
with open(file_path, "rb") as docx_file:
if include_images:
# Extract images and handle them based on mode
images_info = []
def convert_image(image):
image_data = image.open()
content_type = image.content_type
ext = content_type.split('/')[-1] if '/' in content_type else 'png'
if image_mode == "base64":
if len(image_data) <= max_image_size:
encoded = base64.b64encode(image_data).decode('utf-8')
images_info.append({
"filename": f"image_{len(images_info)}.{ext}",
"content_type": content_type,
"size_bytes": len(image_data),
"mode": "base64"
})
return {
"src": f"data:{content_type};base64,{encoded}"
}
else:
# Too large for base64, fall back to reference
filename = f"large_image_{len(images_info)}.{ext}"
images_info.append({
"filename": filename,
"content_type": content_type,
"size_bytes": len(image_data),
"mode": "reference",
"note": "Too large for base64 encoding"
})
return {"src": filename}
elif image_mode == "files":
# Save image to file
nonlocal output_dir
if not output_dir:
output_dir = os.path.join(TEMP_DIR, "markdown_images")
os.makedirs(output_dir, exist_ok=True)
filename = f"image_{len(images_info)}.{ext}"
file_path = os.path.join(output_dir, filename)
with open(file_path, 'wb') as img_file:
img_file.write(image_data)
images_info.append({
"filename": filename,
"file_path": file_path,
"content_type": content_type,
"size_bytes": len(image_data),
"mode": "file"
})
return {"src": file_path}
else: # references
filename = f"image_{len(images_info)}.{ext}"
images_info.append({
"filename": filename,
"content_type": content_type,
"size_bytes": len(image_data),
"mode": "reference"
})
return {"src": filename}
# Convert with image handling
result = mammoth.convert_to_html(
docx_file,
convert_image=mammoth.images.img_element(convert_image)
)
html_content = result.value
markdown_content = _html_to_markdown(html_content, preserve_structure)
conversion_result = {
"content": markdown_content,
"method_used": "mammoth-with-images",
"images": images_info
}
else:
# Convert without images
result = mammoth.convert_to_markdown(docx_file)
markdown_content = result.value
conversion_result = {
"content": markdown_content,
"method_used": "mammoth-markdown",
"images": []
}
# Handle summary mode
if summary_only and len(markdown_content) > 5000:
# For summary mode, truncate large content
markdown_content = markdown_content[:5000] + "\n\n[Content truncated - use summary_only=false for full content]"
# Update the conversion result
conversion_result["content"] = markdown_content
# Extract structure information
if preserve_structure:
structure = _extract_markdown_structure(markdown_content)
conversion_result["structure"] = structure
return conversion_result
except ImportError:
# Fall back to python-docx with custom markdown conversion
return await _convert_docx_with_python_docx(
file_path, include_images, image_mode, max_image_size,
preserve_structure, page_numbers, summary_only, output_dir, bookmark_name, chapter_name
)
except Exception:
# Fall back to python-docx
return await _convert_docx_with_python_docx(
file_path, include_images, image_mode, max_image_size,
preserve_structure, page_numbers, summary_only, output_dir, bookmark_name, chapter_name
)
async def _convert_docx_with_python_docx(
file_path: str,
include_images: bool,
image_mode: str,
max_image_size: int,
preserve_structure: bool,
page_numbers: list[int],
summary_only: bool,
output_dir: str,
bookmark_name: str = "",
chapter_name: str = ""
) -> dict[str, Any]:
"""Convert .docx using python-docx with custom markdown conversion."""
import base64
import docx
from docx.oxml.table import CT_Tbl
from docx.oxml.text.paragraph import CT_P
from docx.table import Table
from docx.text.paragraph import Paragraph
doc = docx.Document(file_path)
markdown_parts = []
images_info = []
structure_info = {"headings": [], "tables": 0, "lists": 0, "paragraphs": 0}
# Extract images if requested
if include_images:
extracted_images = await _extract_word_images(file_path, ".docx", "png", 1, 1)
for i, img in enumerate(extracted_images):
if image_mode == "base64":
if img.get("size_bytes", 0) <= max_image_size:
with open(img["path"], "rb") as img_file:
img_data = img_file.read()
encoded = base64.b64encode(img_data).decode('utf-8')
images_info.append({
"filename": img["filename"],
"content_type": f"image/{img.get('format', 'png').lower()}",
"size_bytes": img.get("size_bytes", 0),
"mode": "base64",
"markdown_ref": f"![Image {i+1}](data:image/{img.get('format', 'png').lower()};base64,{encoded})"
})
else:
images_info.append({
"filename": img["filename"],
"size_bytes": img.get("size_bytes", 0),
"mode": "reference",
"markdown_ref": f"![Image {i+1}]({img['filename']})",
"note": "Too large for base64 encoding"
})
elif image_mode == "files":
images_info.append({
"filename": img["filename"],
"file_path": img["path"],
"size_bytes": img.get("size_bytes", 0),
"mode": "file",
"markdown_ref": f"![Image {i+1}]({img['path']})"
})
else: # references
images_info.append({
"filename": img["filename"],
"size_bytes": img.get("size_bytes", 0),
"mode": "reference",
"markdown_ref": f"![Image {i+1}]({img['filename']})"
})
# Handle bookmark-based, chapter-based, or page-based extraction vs full document
if bookmark_name:
# For bookmark extraction, find the bookmark boundaries
bookmark_range = await _find_bookmark_content_range(doc, bookmark_name)
if not bookmark_range:
return {
"content": f"Bookmark '{bookmark_name}' not found in document",
"method_used": "python-docx-bookmark-not-found",
"images": [],
"bookmark_error": True
}
max_paragraphs = 500 # Generous limit for bookmark sections
max_chars = 100000
chapter_range = None
elif chapter_name:
# For chapter extraction, find the heading boundaries
chapter_range = await _find_chapter_content_range(doc, chapter_name)
if not chapter_range:
return {
"content": f"Chapter '{chapter_name}' not found in document. Available headings will be listed in processing_limits.",
"method_used": "python-docx-chapter-not-found",
"images": [],
"chapter_error": True,
"available_headings": await _get_available_headings(doc)
}
max_paragraphs = 500 # Generous limit for chapter sections
max_chars = 100000
bookmark_range = None
elif page_numbers:
# For page ranges, severely limit content extraction
max_pages_requested = max(page_numbers) if page_numbers else 1
# Rough estimate: ~20-30 paragraphs per page
max_paragraphs = min(max_pages_requested * 25, 100) # Cap at 100 paragraphs max
max_chars = min(max_pages_requested * 8000, 40000) # Cap at 40k chars max
bookmark_range = None
chapter_range = None
else:
max_paragraphs = 1000 # Large limit for full document
max_chars = 200000
bookmark_range = None
chapter_range = None
current_page = 1
processed_paragraphs = 0
total_chars = 0
include_current_page = not page_numbers or current_page in page_numbers
table_of_contents = [] # Track headings with page numbers for TOC
for element_idx, element in enumerate(doc.element.body):
# Early termination if we've processed enough content
if processed_paragraphs >= max_paragraphs or total_chars >= max_chars:
break
# Skip elements outside bookmark/chapter range if targeted extraction is used
if bookmark_range and not (bookmark_range['start_idx'] <= element_idx <= bookmark_range['end_idx']):
continue
if chapter_range and not (chapter_range['start_idx'] <= element_idx <= chapter_range['end_idx']):
continue
if isinstance(element, CT_P):
paragraph = Paragraph(element, doc)
# Check for page breaks
if _has_page_break(paragraph):
current_page += 1
include_current_page = not page_numbers or current_page in page_numbers
continue
# Process content with strict limits
markdown_text = _paragraph_to_markdown(paragraph, preserve_structure)
if markdown_text.strip():
# Check if adding this would exceed limits
text_length = len(markdown_text)
if total_chars + text_length > max_chars:
break # Stop processing
markdown_parts.append(markdown_text)
processed_paragraphs += 1
total_chars += text_length
structure_info["paragraphs"] += 1
# Track headings for both structure and TOC
if preserve_structure and markdown_text.startswith('#'):
level = len(markdown_text) - len(markdown_text.lstrip('#'))
heading_text = markdown_text.lstrip('# ').strip()
heading_info = {
"level": level,
"text": heading_text,
"position": len(markdown_parts) - 1,
"page": current_page
}
structure_info["headings"].append(heading_info)
# Add to table of contents
table_of_contents.append({
"level": level,
"title": heading_text,
"page": current_page,
"suggested_page_range": f"{current_page}-{current_page + _estimate_section_length(level)}"
})
elif isinstance(element, CT_Tbl):
# Process tables with strict limits
if processed_paragraphs < max_paragraphs and total_chars < max_chars:
table = Table(element, doc)
table_markdown = _table_to_markdown(table)
if table_markdown.strip():
table_length = len(table_markdown)
if total_chars + table_length > max_chars:
break # Stop processing
markdown_parts.append(table_markdown)
total_chars += table_length
structure_info["tables"] += 1
# Add image references at the end if any
if include_images and images_info:
markdown_parts.append("\n## Images\n")
for img in images_info:
markdown_parts.append(img["markdown_ref"])
markdown_content = "\n\n".join(markdown_parts)
result = {
"content": markdown_content,
"method_used": "python-docx-custom",
"images": images_info
}
# Add table of contents for navigation
if table_of_contents:
result["table_of_contents"] = _optimize_toc_page_ranges(table_of_contents)
# Add processing limits info
result["processing_limits"] = {
"max_paragraphs_allowed": max_paragraphs,
"max_chars_allowed": max_chars,
"paragraphs_processed": processed_paragraphs,
"chars_processed": total_chars,
"content_truncated": processed_paragraphs >= max_paragraphs or total_chars >= max_chars,
"note": f"Processed {processed_paragraphs}/{max_paragraphs} paragraphs, {total_chars:,}/{max_chars:,} chars"
}
# Add extraction method info
if bookmark_name and bookmark_range:
result["bookmark_extraction"] = {
"bookmark_name": bookmark_name,
"elements_range": f"{bookmark_range['start_idx']}-{bookmark_range['end_idx']}",
"extraction_note": bookmark_range["note"]
}
elif chapter_name and chapter_range:
result["chapter_extraction"] = {
"chapter_name": chapter_name,
"elements_range": f"{chapter_range['start_idx']}-{chapter_range['end_idx']}",
"extraction_note": chapter_range["note"]
}
elif page_numbers:
result["pages_processed"] = page_numbers
result["total_pages_in_range"] = len(page_numbers)
# Handle summary mode
if summary_only and len(markdown_content) > 5000:
markdown_content = markdown_content[:5000] + "\n\n[Content truncated - use summary_only=false for full content]"
# Update the result content
result["content"] = markdown_content
# Add structure info
if preserve_structure:
result["structure"] = structure_info
return result
async def _convert_doc_to_markdown(
file_path: str,
include_images: bool,
image_mode: str,
max_image_size: int,
preserve_structure: bool,
page_numbers: list[int],
summary_only: bool,
output_dir: str
) -> dict[str, Any]:
"""Convert legacy .doc file to markdown using available methods."""
try:
import mammoth
with open(file_path, "rb") as doc_file:
result = mammoth.convert_to_markdown(doc_file)
markdown_content = result.value
conversion_result = {
"content": markdown_content,
"method_used": "mammoth-doc",
"images": [] # Legacy .doc image extraction is complex
}
# Handle summary mode
if summary_only and len(markdown_content) > 5000:
markdown_content = markdown_content[:5000] + "\n\n[Content truncated - use summary_only=false for full content]"
# Update the conversion result
conversion_result["content"] = markdown_content
if preserve_structure:
structure = _extract_markdown_structure(markdown_content)
conversion_result["structure"] = structure
return conversion_result
except ImportError:
raise OfficeFileError("Legacy .doc conversion requires mammoth library")
except Exception as e:
raise OfficeFileError(f"Legacy .doc conversion failed: {str(e)}")
def _paragraph_to_markdown(paragraph, preserve_structure: bool) -> str:
"""Convert a Word paragraph to markdown format."""
text = paragraph.text.strip()
if not text:
return ""
if not preserve_structure:
return text
# Handle different paragraph styles
style_name = paragraph.style.name.lower() if paragraph.style else ""
if "heading" in style_name:
# Extract heading level from style name
import re
level_match = re.search(r'(\d+)', style_name)
level = int(level_match.group(1)) if level_match else 1
return f"{'#' * level} {text}"
elif "title" in style_name:
return f"# {text}"
elif "subtitle" in style_name:
return f"## {text}"
elif style_name in ["list paragraph", "list"]:
return f"- {text}"
elif "quote" in style_name:
return f"> {text}"
else:
return text
def _table_to_markdown(table) -> str:
"""Convert a Word table to markdown format."""
markdown_rows = []
for i, row in enumerate(table.rows):
cells = [cell.text.strip().replace('\n', ' ') for cell in row.cells]
markdown_row = "| " + " | ".join(cells) + " |"
markdown_rows.append(markdown_row)
# Add header separator after first row
if i == 0:
separator = "| " + " | ".join(["---"] * len(cells)) + " |"
markdown_rows.append(separator)
return "\n".join(markdown_rows)
def _html_to_markdown(html_content: str, preserve_structure: bool) -> str:
"""Convert HTML content to markdown format."""
import re
# Basic HTML to Markdown conversions
conversions = [
(r'<h1[^>]*>(.*?)</h1>', r'# \1'),
(r'<h2[^>]*>(.*?)</h2>', r'## \1'),
(r'<h3[^>]*>(.*?)</h3>', r'### \1'),
(r'<h4[^>]*>(.*?)</h4>', r'#### \1'),
(r'<h5[^>]*>(.*?)</h5>', r'##### \1'),
(r'<h6[^>]*>(.*?)</h6>', r'###### \1'),
(r'<strong[^>]*>(.*?)</strong>', r'**\1**'),
(r'<b[^>]*>(.*?)</b>', r'**\1**'),
(r'<em[^>]*>(.*?)</em>', r'*\1*'),
(r'<i[^>]*>(.*?)</i>', r'*\1*'),
(r'<code[^>]*>(.*?)</code>', r'`\1`'),
(r'<a[^>]*href="([^"]*)"[^>]*>(.*?)</a>', r'[\2](\1)'),
(r'<img[^>]*src="([^"]*)"[^>]*/?>', r'![](\1)'),
(r'<p[^>]*>(.*?)</p>', r'\1\n'),
(r'<br[^>]*/?>', r'\n'),
(r'<li[^>]*>(.*?)</li>', r'- \1'),
(r'<ul[^>]*>(.*?)</ul>', r'\1'),
(r'<ol[^>]*>(.*?)</ol>', r'\1'),
(r'<blockquote[^>]*>(.*?)</blockquote>', r'> \1'),
]
markdown = html_content
for pattern, replacement in conversions:
markdown = re.sub(pattern, replacement, markdown, flags=re.DOTALL | re.IGNORECASE)
# Clean up extra whitespace
markdown = re.sub(r'\n\s*\n\s*\n', '\n\n', markdown)
markdown = re.sub(r'^\s+|\s+$', '', markdown, flags=re.MULTILINE)
return markdown
def _chunk_markdown(content: str, chunk_size: int) -> list[dict[str, Any]]:
"""Split markdown content into chunks while preserving structure."""
chunks = []
lines = content.split('\n')
current_chunk = []
current_size = 0
chunk_num = 1
for line in lines:
line_size = len(line) + 1 # +1 for newline
# If adding this line would exceed chunk size and we have content
if current_size + line_size > chunk_size and current_chunk:
chunks.append({
"chunk_number": chunk_num,
"content": '\n'.join(current_chunk),
"character_count": current_size,
"line_count": len(current_chunk)
})
current_chunk = []
current_size = 0
chunk_num += 1
current_chunk.append(line)
current_size += line_size
# Add final chunk if there's remaining content
if current_chunk:
chunks.append({
"chunk_number": chunk_num,
"content": '\n'.join(current_chunk),
"character_count": current_size,
"line_count": len(current_chunk)
})
return chunks
def _extract_markdown_structure(content: str) -> dict[str, Any]:
"""Extract structure information from markdown content."""
import re
structure = {
"headings": [],
"lists": 0,
"links": 0,
"images": 0,
"code_blocks": 0,
"tables": 0,
"line_count": len(content.split('\n'))
}
lines = content.split('\n')
for i, line in enumerate(lines):
# Find headings
heading_match = re.match(r'^(#{1,6})\s+(.+)', line)
if heading_match:
level = len(heading_match.group(1))
text = heading_match.group(2).strip()
structure["headings"].append({
"level": level,
"text": text,
"line_number": i + 1
})
# Count other elements
if re.match(r'^[-*+]\s+', line):
structure["lists"] += 1
structure["links"] += len(re.findall(r'\[([^\]]+)\]\([^)]+\)', line))
structure["images"] += len(re.findall(r'!\[([^\]]*)\]\([^)]+\)', line))
if line.strip().startswith('```'):
structure["code_blocks"] += 1
if '|' in line and line.count('|') >= 2:
structure["tables"] += 1
return structure
async def _find_bookmark_content_range(doc, bookmark_name: str) -> dict[str, Any]:
"""Find the content range for a specific bookmark."""
try:
# Find bookmark start and end positions in the document
bookmark_starts = {}
bookmark_ends = {}
# Look for bookmark markers in the document XML
for elem_idx, element in enumerate(doc.element.body):
# Look for bookmark start markers
for bookmark_start in element.xpath('.//w:bookmarkStart', namespaces={'w': 'http://schemas.openxmlformats.org/wordprocessingml/2006/main'}):
name = bookmark_start.get('{http://schemas.openxmlformats.org/wordprocessingml/2006/main}name')
if name == bookmark_name:
bookmark_id = bookmark_start.get('{http://schemas.openxmlformats.org/wordprocessingml/2006/main}id')
bookmark_starts[bookmark_id] = elem_idx
# Look for bookmark end markers
for bookmark_end in element.xpath('.//w:bookmarkEnd', namespaces={'w': 'http://schemas.openxmlformats.org/wordprocessingml/2006/main'}):
bookmark_id = bookmark_end.get('{http://schemas.openxmlformats.org/wordprocessingml/2006/main}id')
if bookmark_id in bookmark_starts:
bookmark_ends[bookmark_id] = elem_idx
break
# Find the bookmark range
for bookmark_id, start_idx in bookmark_starts.items():
if bookmark_id in bookmark_ends:
end_idx = bookmark_ends[bookmark_id]
# Extend range to capture full sections (look for next major heading)
extended_end = min(end_idx + 50, len(doc.element.body) - 1) # Extend by 50 elements or end of doc
return {
'start_idx': start_idx,
'end_idx': extended_end,
'bookmark_id': bookmark_id,
'note': f"Extracting content from bookmark '{bookmark_name}' (elements {start_idx}-{extended_end})"
}
return None # Bookmark not found
except Exception:
return None # Error finding bookmark
async def _find_chapter_content_range(doc, chapter_name: str) -> dict[str, Any]:
"""Find the content range for a specific chapter by heading text."""
try:
# Find heading that matches the chapter name
chapter_start_idx = None
chapter_end_idx = None
# Search through document elements for matching heading
for elem_idx, element in enumerate(doc.element.body):
# Check if this element is a paragraph with heading style
try:
para = element
if para.tag.endswith('}p'): # Word paragraph element
# Get the text content
text_content = ''.join(text_elem.text or '' for text_elem in para.xpath('.//w:t', namespaces={'w': 'http://schemas.openxmlformats.org/wordprocessingml/2006/main'}))
# Check if this matches our chapter name (case insensitive, flexible matching)
if text_content.strip() and chapter_name.lower() in text_content.lower().strip():
# Check if it's actually a heading by looking at paragraph style
style_elem = para.xpath('.//w:pStyle', namespaces={'w': 'http://schemas.openxmlformats.org/wordprocessingml/2006/main'})
if style_elem:
style_val = style_elem[0].get('{http://schemas.openxmlformats.org/wordprocessingml/2006/main}val', '')
if 'heading' in style_val.lower() or 'title' in style_val.lower():
chapter_start_idx = elem_idx
break
# Also consider short text lines as potential headings
elif len(text_content.strip()) < 100:
chapter_start_idx = elem_idx
break
except Exception:
continue
if chapter_start_idx is None:
return None # Chapter heading not found
# Find the end of this chapter (next major heading or end of document)
chapter_end_idx = len(doc.element.body) - 1 # Default to end of document
# Look for the next major heading to determine chapter end
for elem_idx in range(chapter_start_idx + 1, len(doc.element.body)):
try:
para = doc.element.body[elem_idx]
if para.tag.endswith('}p'):
# Check if this is a major heading (same level or higher than chapter start)
style_elem = para.xpath('.//w:pStyle', namespaces={'w': 'http://schemas.openxmlformats.org/wordprocessingml/2006/main'})
if style_elem:
style_val = style_elem[0].get('{http://schemas.openxmlformats.org/wordprocessingml/2006/main}val', '')
if 'heading1' in style_val.lower() or 'title' in style_val.lower():
chapter_end_idx = elem_idx - 1
break
except Exception:
continue
return {
'start_idx': chapter_start_idx,
'end_idx': chapter_end_idx,
'chapter_name': chapter_name,
'note': f"Extracting content for chapter '{chapter_name}' (elements {chapter_start_idx}-{chapter_end_idx})"
}
except Exception:
return None # Error finding chapter
async def _get_available_headings(doc) -> list[str]:
"""Extract available headings from the document to help users find chapter names."""
try:
headings = []
# Search through document elements for headings
for element in doc.element.body[:100]: # Only check first 100 elements to avoid token issues
try:
if element.tag.endswith('}p'): # Word paragraph element
# Get the text content
text_content = ''.join(text_elem.text or '' for text_elem in element.xpath('.//w:t', namespaces={'w': 'http://schemas.openxmlformats.org/wordprocessingml/2006/main'}))
if text_content.strip():
# Check if it's a heading by looking at paragraph style
style_elem = element.xpath('.//w:pStyle', namespaces={'w': 'http://schemas.openxmlformats.org/wordprocessingml/2006/main'})
if style_elem:
style_val = style_elem[0].get('{http://schemas.openxmlformats.org/wordprocessingml/2006/main}val', '')
if 'heading' in style_val.lower() or 'title' in style_val.lower():
headings.append(text_content.strip()[:100]) # Limit heading length
# Also consider short text lines as potential headings
elif len(text_content.strip()) < 100:
# Only add if it looks like a heading (not just short random text)
if any(word in text_content.lower() for word in ['chapter', 'section', 'part', 'introduction', 'conclusion']):
headings.append(text_content.strip())
except Exception:
continue
return headings[:20] # Return max 20 headings to avoid token issues
except Exception:
return []
async def _get_ultra_fast_summary(file_path: str) -> dict[str, Any]:
"""Ultra-fast summary that extracts minimal data to prevent MCP token limits."""
try:
import docx
doc = docx.Document(file_path)
# Extract only the first few paragraphs and major headings
content_parts = []
heading_count = 0
paragraph_count = 0
max_content_length = 2000 # Very short limit
current_length = 0
# Get basic structure info quickly
total_paragraphs = len(doc.paragraphs)
total_tables = len(doc.tables)
# Extract bookmarks (chapter markers)
bookmarks = []
try:
# Access document's bookmarks through the XML
for bookmark in doc.element.xpath('//w:bookmarkStart', namespaces={'w': 'http://schemas.openxmlformats.org/wordprocessingml/2006/main'}):
bookmark_name = bookmark.get('{http://schemas.openxmlformats.org/wordprocessingml/2006/main}name')
if bookmark_name and not bookmark_name.startswith('_'): # Skip system bookmarks
bookmarks.append(bookmark_name)
except Exception:
pass # Bookmarks extraction failed, continue without
# Extract just a few key headings and the start of content
for para in doc.paragraphs[:50]: # Only check first 50 paragraphs
text = para.text.strip()
if not text:
continue
# Check if it's a heading (simple heuristic)
is_heading = (para.style and "heading" in para.style.name.lower()) or len(text) < 100
if is_heading and heading_count < 10: # Max 10 headings
content_parts.append(f"# {text}")
heading_count += 1
current_length += len(text) + 3
elif paragraph_count < 5 and current_length < max_content_length: # Max 5 paragraphs
content_parts.append(text)
paragraph_count += 1
current_length += len(text)
if current_length > max_content_length:
break
# Create very basic summary
summary_content = "\n\n".join(content_parts)
# Extract available headings for chapter navigation
available_headings = await _get_available_headings(doc)
return {
"content": summary_content,
"method_used": "ultra-fast-summary",
"table_of_contents": {
"note": "Use full document processing for detailed TOC",
"basic_info": f"Document has ~{total_paragraphs} paragraphs, {total_tables} tables, {heading_count} headings found in first scan",
"bookmarks": bookmarks[:20] if bookmarks else [], # Limit to first 20 bookmarks
"bookmark_count": len(bookmarks),
"bookmark_note": "Bookmarks often indicate chapter starts. Use these as navigation hints for page_range extraction.",
"available_headings": available_headings[:10] if available_headings else [], # Limit to first 10 headings
"heading_count": len(available_headings),
"heading_note": "Use these headings with chapter_name parameter for chapter-based extraction when bookmarks are not available."
}
}
except Exception as e:
return {
"content": f"Error creating summary: {str(e)}",
"method_used": "error-fallback",
"table_of_contents": {"note": "Summary generation failed"}
}
def _smart_truncate_content(content: str, max_chars: int) -> str:
"""Intelligently truncate content while preserving structure and readability."""
if len(content) <= max_chars:
return content
lines = content.split('\n')
truncated_lines = []
current_length = 0
# Try to preserve structure by stopping at a natural break point
for line in lines:
line_length = len(line) + 1 # +1 for newline
# If adding this line would exceed limit
if current_length + line_length > max_chars:
# Try to find a good stopping point
if truncated_lines:
# Check if we're in the middle of a section
last_lines = '\n'.join(truncated_lines[-3:]) if len(truncated_lines) >= 3 else '\n'.join(truncated_lines)
# If we stopped mid-paragraph, remove incomplete paragraph
if not (line.strip() == '' or line.startswith('#') or line.startswith('|')):
# Remove lines until we hit a natural break
while truncated_lines and not (
truncated_lines[-1].strip() == '' or
truncated_lines[-1].startswith('#') or
truncated_lines[-1].startswith('|') or
truncated_lines[-1].startswith('-') or
truncated_lines[-1].startswith('*')
):
truncated_lines.pop()
break
truncated_lines.append(line)
current_length += line_length
# Add truncation notice
result = '\n'.join(truncated_lines)
result += f"\n\n---\n**[CONTENT TRUNCATED]**\nShowing {len(result):,} of {len(content):,} characters.\nUse smaller page ranges (e.g., 3-5 pages) for full content without truncation.\n---"
return result
def _estimate_section_length(heading_level: int) -> int:
"""Estimate how many pages a section might span based on heading level."""
# Higher level headings (H1) tend to have longer sections
if heading_level == 1: # Major chapters
return 8
elif heading_level == 2: # Major sections
return 4
elif heading_level == 3: # Subsections
return 2
else: # Minor headings
return 1
def _optimize_toc_page_ranges(toc_entries: list) -> dict[str, Any]:
"""Optimize table of contents page ranges based on actual heading positions."""
optimized_toc = {
"sections": [],
"total_sections": len(toc_entries),
"suggested_chunking": []
}
for i, entry in enumerate(toc_entries):
# Calculate actual end page based on next heading or document end
if i + 1 < len(toc_entries):
next_page = toc_entries[i + 1]["page"]
actual_end_page = max(entry["page"], next_page - 1)
else:
# Last section - use estimated length
actual_end_page = entry["page"] + _estimate_section_length(entry["level"])
optimized_entry = {
"level": entry["level"],
"title": entry["title"],
"start_page": entry["page"],
"estimated_end_page": actual_end_page,
"suggested_page_range": f"{entry['page']}-{actual_end_page}",
"section_type": _classify_section_type(entry["level"], entry["title"])
}
optimized_toc["sections"].append(optimized_entry)
# Generate chunking suggestions
optimized_toc["suggested_chunking"] = _generate_chunking_suggestions(optimized_toc["sections"])
return optimized_toc
def _classify_section_type(level: int, title: str) -> str:
"""Classify section type based on level and title patterns."""
title_lower = title.lower()
if level == 1:
if any(word in title_lower for word in ["chapter", "part", "section"]):
return "chapter"
elif any(word in title_lower for word in ["introduction", "conclusion", "summary"]):
return "special_section"
else:
return "major_section"
elif level == 2:
return "section"
elif level == 3:
return "subsection"
else:
return "minor_heading"
def _generate_chunking_suggestions(sections: list) -> list[dict[str, Any]]:
"""Generate smart chunking suggestions based on document structure."""
suggestions = []
current_chunk_pages = 0
chunk_start = 1
chunk_sections = []
for section in sections:
section_pages = section["estimated_end_page"] - section["start_page"] + 1
# If adding this section would make chunk too large, finalize current chunk
# Use smaller chunks (8 pages) to prevent MCP token limit issues
if current_chunk_pages + section_pages > 8 and chunk_sections:
suggestions.append({
"chunk_number": len(suggestions) + 1,
"page_range": f"{chunk_start}-{chunk_sections[-1]['estimated_end_page']}",
"sections_included": [s["title"] for s in chunk_sections],
"estimated_pages": current_chunk_pages,
"description": f"Chunk {len(suggestions) + 1}: {chunk_sections[0]['title']}" +
(f" + {len(chunk_sections)-1} more sections" if len(chunk_sections) > 1 else "")
})
# Start new chunk
chunk_start = section["start_page"]
current_chunk_pages = section_pages
chunk_sections = [section]
else:
# Add to current chunk
current_chunk_pages += section_pages
chunk_sections.append(section)
# Add final chunk if any sections remain
if chunk_sections:
suggestions.append({
"chunk_number": len(suggestions) + 1,
"page_range": f"{chunk_start}-{chunk_sections[-1]['estimated_end_page']}",
"sections_included": [s["title"] for s in chunk_sections],
"estimated_pages": current_chunk_pages,
"description": f"Chunk {len(suggestions) + 1}: {chunk_sections[0]['title']}" +
(f" + {len(chunk_sections)-1} more sections" if len(chunk_sections) > 1 else "")
})
return suggestions
def _has_page_break(paragraph) -> bool:
"""Check if a paragraph contains a page break."""
try:
# Check for explicit page breaks in paragraph runs
for run in paragraph.runs:
if run._r.find('.//{http://schemas.openxmlformats.org/wordprocessingml/2006/main}br') is not None:
br_elem = run._r.find('.//{http://schemas.openxmlformats.org/wordprocessingml/2006/main}br')
if br_elem is not None and br_elem.get('{http://schemas.openxmlformats.org/wordprocessingml/2006/main}type') == 'page':
return True
return False
except Exception:
return False
def _parse_page_range(page_range: str) -> list[int]:
"""Parse page range string into list of page numbers.
Examples:
"1-5" -> [1, 2, 3, 4, 5]
"1,3,5" -> [1, 3, 5]
"1-3,5,7-9" -> [1, 2, 3, 5, 7, 8, 9]
"""
pages = set()
for part in page_range.split(','):
part = part.strip()
if '-' in part:
# Handle range like "1-5"
start, end = part.split('-', 1)
try:
start_num = int(start.strip())
end_num = int(end.strip())
pages.update(range(start_num, end_num + 1))
except ValueError:
continue
else:
# Handle single page like "3"
try:
pages.add(int(part))
except ValueError:
continue
return sorted(list(pages))
async def _analyze_document_size(file_path: str, extension: str) -> dict[str, Any]:
"""Analyze document to estimate size and complexity."""
analysis = {
"estimated_pages": 1,
"file_size_mb": 0,
"complexity": "simple",
"estimated_content_size": "small"
}
try:
# Get file size
from pathlib import Path
file_size = Path(file_path).stat().st_size
analysis["file_size_mb"] = round(file_size / (1024 * 1024), 2)
if extension == ".docx":
try:
import docx
doc = docx.Document(file_path)
# Estimate pages based on content
paragraph_count = len(doc.paragraphs)
table_count = len(doc.tables)
# Rough estimation: ~40 paragraphs per page
estimated_pages = max(1, paragraph_count // 40)
analysis["estimated_pages"] = estimated_pages
# Determine complexity
if table_count > 10 or paragraph_count > 500:
analysis["complexity"] = "complex"
elif table_count > 5 or paragraph_count > 200:
analysis["complexity"] = "moderate"
# Estimate content size
if estimated_pages > 20:
analysis["estimated_content_size"] = "very_large"
elif estimated_pages > 10:
analysis["estimated_content_size"] = "large"
elif estimated_pages > 5:
analysis["estimated_content_size"] = "medium"
except Exception:
# Fallback to file size estimation
if file_size > 5 * 1024 * 1024: # 5MB
analysis["estimated_pages"] = 50
analysis["estimated_content_size"] = "very_large"
elif file_size > 1 * 1024 * 1024: # 1MB
analysis["estimated_pages"] = 20
analysis["estimated_content_size"] = "large"
elif file_size > 500 * 1024: # 500KB
analysis["estimated_pages"] = 10
analysis["estimated_content_size"] = "medium"
except Exception:
pass
return analysis
def _get_processing_recommendation(
doc_analysis: dict[str, Any],
page_range: str,
summary_only: bool
) -> dict[str, Any]:
"""Generate intelligent processing recommendations based on document analysis."""
estimated_pages = doc_analysis["estimated_pages"]
content_size = doc_analysis["estimated_content_size"]
recommendation = {
"status": "optimal",
"message": "",
"suggested_workflow": [],
"warnings": []
}
# Large document recommendations
if content_size in ["large", "very_large"] and not page_range and not summary_only:
recommendation["status"] = "suboptimal"
recommendation["message"] = (
f"⚠️ Large document detected ({estimated_pages} estimated pages). "
"Consider using recommended workflow for better performance."
)
recommendation["suggested_workflow"] = [
"1. First: Call with summary_only=true to get document overview and TOC",
"2. Then: Use page_range to process specific sections (e.g., '1-5', '6-10', '15-20')",
"3. Recommended: Use 3-8 page chunks to stay under 25k token MCP limit",
"4. The tool auto-truncates if content is too large, but smaller ranges work better"
]
recommendation["warnings"] = [
"Page ranges >8 pages may hit 25k token response limit and get truncated",
"Use smaller page ranges (3-5 pages) for dense content documents",
"Auto-truncation preserves structure but loses content completeness"
]
# Medium document recommendations
elif content_size == "medium" and not page_range and not summary_only:
recommendation["status"] = "caution"
recommendation["message"] = (
f"Medium document detected ({estimated_pages} estimated pages). "
"Consider summary_only=true first if you encounter response size issues."
)
recommendation["suggested_workflow"] = [
"Option 1: Try full processing (current approach)",
"Option 2: Use summary_only=true first, then page_range if needed"
]
# Optimal usage patterns
elif summary_only:
recommendation["message"] = "✅ Excellent! Using summary mode for initial document analysis."
recommendation["suggested_workflow"] = [
"After reviewing summary, use page_range to extract specific sections of interest"
]
elif page_range and content_size in ["large", "very_large"]:
recommendation["message"] = "✅ Perfect! Using page-range processing for efficient extraction."
elif content_size == "small":
recommendation["message"] = "✅ Small document - full processing is optimal."
return recommendation
def main():
"""Main entry point for the MCP server."""
import sys
if len(sys.argv) > 1 and sys.argv[1] == "--version":
from . import __version__
print(f"MCP Office Tools v{__version__}")
return
# Run the FastMCP server
app.run()
if __name__ == "__main__":
main()