Ryan Malloy b2033fc239 🔥 Fix critical issue: page_range was processing entire document
- Replace unreliable Word page detection with element-based limiting
- Cap extraction at 25 paragraphs per 'page' requested (max 100 total)
- Cap extraction at 8k chars per 'page' requested (max 40k total)
- Add early termination when limits reached
- Add processing_limits metadata to show actual extraction stats
- Prevent 1.28M token responses by stopping at reasonable content limits
- Single page (page_range='1') now limited to ~25 paragraphs/8k chars
2025-08-22 08:00:02 -06:00

2006 lines
76 KiB
Python

"""MCP Office Tools Server - Comprehensive Microsoft Office document processing.
FastMCP server providing 30+ tools for processing Word, Excel, PowerPoint documents
including both modern formats (.docx, .xlsx, .pptx) and legacy formats (.doc, .xls, .ppt).
"""
import os
import tempfile
import time
from pathlib import Path
from typing import Any
from fastmcp import FastMCP
from pydantic import Field
from .utils import (
OfficeFileError,
classify_document_type,
detect_format,
get_supported_extensions,
resolve_office_file_path,
validate_office_file,
)
# Initialize FastMCP app
app = FastMCP("MCP Office Tools")
# Configuration
TEMP_DIR = os.environ.get("OFFICE_TEMP_DIR", tempfile.gettempdir())
DEBUG = os.environ.get("DEBUG", "false").lower() == "true"
@app.tool()
async def extract_text(
file_path: str = Field(description="Path to Office document or URL"),
preserve_formatting: bool = Field(default=False, description="Preserve text formatting and structure"),
include_metadata: bool = Field(default=True, description="Include document metadata in output"),
method: str = Field(default="auto", description="Extraction method: auto, primary, fallback")
) -> dict[str, Any]:
"""Extract text content from Office documents with intelligent method selection.
Supports Word (.docx, .doc), Excel (.xlsx, .xls), PowerPoint (.pptx, .ppt),
and CSV files. Uses multi-library fallback for maximum compatibility.
"""
start_time = time.time()
try:
# Resolve file path (download if URL)
local_path = await resolve_office_file_path(file_path)
# Validate file
validation = await validate_office_file(local_path)
if not validation["is_valid"]:
raise OfficeFileError(f"Invalid file: {', '.join(validation['errors'])}")
# Get format info
format_info = await detect_format(local_path)
category = format_info["category"]
extension = format_info["extension"]
# Route to appropriate extraction method
if category == "word":
text_result = await _extract_word_text(local_path, extension, preserve_formatting, method)
elif category == "excel":
text_result = await _extract_excel_text(local_path, extension, preserve_formatting, method)
elif category == "powerpoint":
text_result = await _extract_powerpoint_text(local_path, extension, preserve_formatting, method)
else:
raise OfficeFileError(f"Unsupported document category: {category}")
# Compile results
result = {
"text": text_result["text"],
"method_used": text_result["method_used"],
"character_count": len(text_result["text"]),
"word_count": len(text_result["text"].split()) if text_result["text"] else 0,
"extraction_time": round(time.time() - start_time, 3),
"format_info": {
"format": format_info["format_name"],
"category": category,
"is_legacy": format_info["is_legacy"]
}
}
if include_metadata:
result["metadata"] = await _extract_basic_metadata(local_path, extension, category)
if preserve_formatting:
result["formatted_sections"] = text_result.get("formatted_sections", [])
return result
except Exception as e:
if DEBUG:
import traceback
traceback.print_exc()
raise OfficeFileError(f"Text extraction failed: {str(e)}")
@app.tool()
async def extract_images(
file_path: str = Field(description="Path to Office document or URL"),
output_format: str = Field(default="png", description="Output image format: png, jpg, jpeg"),
min_width: int = Field(default=100, description="Minimum image width in pixels"),
min_height: int = Field(default=100, description="Minimum image height in pixels"),
include_metadata: bool = Field(default=True, description="Include image metadata")
) -> dict[str, Any]:
"""Extract images from Office documents with size filtering and format conversion."""
start_time = time.time()
try:
# Resolve file path
local_path = await resolve_office_file_path(file_path)
# Validate file
validation = await validate_office_file(local_path)
if not validation["is_valid"]:
raise OfficeFileError(f"Invalid file: {', '.join(validation['errors'])}")
# Get format info
format_info = await detect_format(local_path)
category = format_info["category"]
extension = format_info["extension"]
# Extract images based on format
if category == "word":
images = await _extract_word_images(local_path, extension, output_format, min_width, min_height)
elif category == "excel":
images = await _extract_excel_images(local_path, extension, output_format, min_width, min_height)
elif category == "powerpoint":
images = await _extract_powerpoint_images(local_path, extension, output_format, min_width, min_height)
else:
raise OfficeFileError(f"Image extraction not supported for category: {category}")
result = {
"images": images,
"image_count": len(images),
"extraction_time": round(time.time() - start_time, 3),
"format_info": {
"format": format_info["format_name"],
"category": category
}
}
if include_metadata:
result["total_size_bytes"] = sum(img.get("size_bytes", 0) for img in images)
return result
except Exception as e:
if DEBUG:
import traceback
traceback.print_exc()
raise OfficeFileError(f"Image extraction failed: {str(e)}")
@app.tool()
async def extract_metadata(
file_path: str = Field(description="Path to Office document or URL")
) -> dict[str, Any]:
"""Extract comprehensive metadata from Office documents."""
start_time = time.time()
try:
# Resolve file path
local_path = await resolve_office_file_path(file_path)
# Validate file
validation = await validate_office_file(local_path)
if not validation["is_valid"]:
raise OfficeFileError(f"Invalid file: {', '.join(validation['errors'])}")
# Get format info
format_info = await detect_format(local_path)
category = format_info["category"]
extension = format_info["extension"]
# Extract metadata based on format
if category == "word":
metadata = await _extract_word_metadata(local_path, extension)
elif category == "excel":
metadata = await _extract_excel_metadata(local_path, extension)
elif category == "powerpoint":
metadata = await _extract_powerpoint_metadata(local_path, extension)
else:
metadata = {"category": category, "basic_info": "Limited metadata available"}
# Add file system metadata
path = Path(local_path)
stat = path.stat()
result = {
"document_metadata": metadata,
"file_metadata": {
"filename": path.name,
"file_size": stat.st_size,
"created": stat.st_ctime,
"modified": stat.st_mtime,
"extension": extension
},
"format_info": format_info,
"extraction_time": round(time.time() - start_time, 3)
}
return result
except Exception as e:
if DEBUG:
import traceback
traceback.print_exc()
raise OfficeFileError(f"Metadata extraction failed: {str(e)}")
@app.tool()
async def detect_office_format(
file_path: str = Field(description="Path to Office document or URL")
) -> dict[str, Any]:
"""Intelligent Office document format detection and analysis."""
start_time = time.time()
try:
# Resolve file path
local_path = await resolve_office_file_path(file_path)
# Detect format
format_info = await detect_format(local_path)
# Classify document
classification = await classify_document_type(local_path)
result = {
"format_detection": format_info,
"document_classification": classification,
"supported": format_info["is_supported"],
"processing_recommendations": format_info.get("processing_hints", []),
"detection_time": round(time.time() - start_time, 3)
}
return result
except Exception as e:
if DEBUG:
import traceback
traceback.print_exc()
raise OfficeFileError(f"Format detection failed: {str(e)}")
@app.tool()
async def analyze_document_health(
file_path: str = Field(description="Path to Office document or URL")
) -> dict[str, Any]:
"""Comprehensive document health and integrity analysis."""
start_time = time.time()
try:
# Resolve file path
local_path = await resolve_office_file_path(file_path)
# Validate file thoroughly
validation = await validate_office_file(local_path)
# Get format info
format_info = await detect_format(local_path)
# Health assessment
health_score = _calculate_health_score(validation, format_info)
result = {
"overall_health": "healthy" if validation["is_valid"] and health_score >= 8 else
"warning" if health_score >= 5 else "problematic",
"health_score": health_score,
"validation_results": validation,
"format_analysis": format_info,
"recommendations": _get_health_recommendations(validation, format_info),
"analysis_time": round(time.time() - start_time, 3)
}
return result
except Exception as e:
if DEBUG:
import traceback
traceback.print_exc()
raise OfficeFileError(f"Health analysis failed: {str(e)}")
@app.tool()
async def convert_to_markdown(
file_path: str = Field(description="Path to Office document or URL"),
include_images: bool = Field(default=True, description="Include images in markdown with base64 encoding or file references"),
image_mode: str = Field(default="base64", description="Image handling mode: 'base64', 'files', or 'references'"),
max_image_size: int = Field(default=1024*1024, description="Maximum image size in bytes for base64 encoding"),
preserve_structure: bool = Field(default=True, description="Preserve document structure (headings, lists, tables)"),
page_range: str = Field(default="", description="Page range to convert (e.g., '1-5', '3', '1,3,5-10'). RECOMMENDED for large documents. Empty = all pages"),
summary_only: bool = Field(default=False, description="Return only metadata and truncated summary. STRONGLY RECOMMENDED for large docs (>10 pages)"),
output_dir: str = Field(default="", description="Output directory for image files (if image_mode='files')")
) -> dict[str, Any]:
"""Convert Office documents to Markdown format with intelligent processing recommendations.
⚠️ RECOMMENDED WORKFLOW FOR LARGE DOCUMENTS (>5 pages):
1. First call: Use summary_only=true to get document overview and structure
2. Then: Use page_range (e.g., "1-10", "15-25") to process specific sections
This prevents response size errors and provides efficient processing.
Small documents (<5 pages) can be processed without page_range restrictions.
"""
start_time = time.time()
try:
# Resolve file path
local_path = await resolve_office_file_path(file_path)
# Validate file
validation = await validate_office_file(local_path)
if not validation["is_valid"]:
raise OfficeFileError(f"Invalid file: {', '.join(validation['errors'])}")
# Get format info
format_info = await detect_format(local_path)
category = format_info["category"]
extension = format_info["extension"]
# Currently focused on Word documents for markdown conversion
if category != "word":
raise OfficeFileError(f"Markdown conversion currently only supports Word documents, got: {category}")
# Analyze document size and provide intelligent recommendations
doc_analysis = await _analyze_document_size(local_path, extension)
processing_recommendation = _get_processing_recommendation(
doc_analysis, page_range, summary_only
)
# Parse page range if provided
page_numbers = _parse_page_range(page_range) if page_range else None
# Convert to markdown based on format
if extension == ".docx":
markdown_result = await _convert_docx_to_markdown(
local_path, include_images, image_mode, max_image_size,
preserve_structure, page_numbers, summary_only, output_dir
)
else: # .doc
# For legacy .doc files, use mammoth if available
markdown_result = await _convert_doc_to_markdown(
local_path, include_images, image_mode, max_image_size,
preserve_structure, page_numbers, summary_only, output_dir
)
# Build result based on mode
result = {
"metadata": {
"original_file": os.path.basename(local_path),
"format": format_info["format_name"],
"conversion_method": markdown_result["method_used"],
"conversion_time": round(time.time() - start_time, 3),
"summary_only": summary_only,
"document_analysis": doc_analysis,
"processing_recommendation": processing_recommendation
}
}
# Add page range info if used
if page_range:
result["metadata"]["page_range"] = page_range
result["metadata"]["pages_processed"] = len(page_numbers) if page_numbers else 0
# Add content based on mode
if summary_only:
# VERY restrictive summary mode to prevent massive responses
result["metadata"]["character_count"] = len(markdown_result["content"])
result["metadata"]["word_count"] = len(markdown_result["content"].split())
# Ultra-short summary (only 500 chars max)
result["summary"] = markdown_result["content"][:500] + "..." if len(markdown_result["content"]) > 500 else markdown_result["content"]
# Severely limit table of contents to prevent 1M+ token responses
if "table_of_contents" in markdown_result:
toc = markdown_result["table_of_contents"]
if "sections" in toc and len(toc["sections"]) > 20:
# Limit to first 20 sections only
limited_toc = {
"sections": toc["sections"][:20],
"total_sections": len(toc["sections"]),
"showing_first": 20,
"note": f"Showing first 20 of {len(toc['sections'])} sections. Use page_range to extract specific sections.",
"suggested_chunking": toc.get("suggested_chunking", [])[:10] # Limit chunking suggestions too
}
result["table_of_contents"] = limited_toc
else:
result["table_of_contents"] = toc
else:
# Include content with automatic size limiting to prevent MCP errors
content = markdown_result["content"]
# Apply aggressive content limiting to stay under 25k token limit
# Rough estimate: ~4 chars per token, leave buffer for metadata
max_content_chars = 80000 # ~20k tokens worth of content
if len(content) > max_content_chars:
# Truncate but try to preserve structure
truncated_content = _smart_truncate_content(content, max_content_chars)
result["markdown"] = truncated_content
result["content_truncated"] = True
result["original_length"] = len(content)
result["truncated_length"] = len(truncated_content)
result["truncation_note"] = f"Content truncated to stay under MCP 25k token limit. Original: {len(content):,} chars, Shown: {len(truncated_content):,} chars. Use smaller page ranges for full content."
else:
result["markdown"] = content
result["content_truncated"] = False
result["metadata"]["character_count"] = len(content)
result["metadata"]["word_count"] = len(content.split())
# Add image info
if include_images and markdown_result.get("images"):
result["images"] = markdown_result["images"]
result["metadata"]["image_count"] = len(markdown_result["images"])
result["metadata"]["total_image_size"] = sum(
img.get("size_bytes", 0) for img in markdown_result["images"]
)
# Add structure info
if preserve_structure and markdown_result.get("structure"):
result["structure"] = markdown_result["structure"]
return result
except Exception as e:
if DEBUG:
import traceback
traceback.print_exc()
raise OfficeFileError(f"Markdown conversion failed: {str(e)}")
@app.tool()
async def get_supported_formats() -> dict[str, Any]:
"""Get list of all supported Office document formats and their capabilities."""
extensions = get_supported_extensions()
format_details = {}
for ext in extensions:
from .utils.validation import get_format_info
info = get_format_info(ext)
if info:
format_details[ext] = {
"format_name": info["format_name"],
"category": info["category"],
"mime_types": info["mime_types"]
}
return {
"supported_extensions": extensions,
"format_details": format_details,
"categories": {
"word": [ext for ext, info in format_details.items() if info["category"] == "word"],
"excel": [ext for ext, info in format_details.items() if info["category"] == "excel"],
"powerpoint": [ext for ext, info in format_details.items() if info["category"] == "powerpoint"]
},
"total_formats": len(extensions)
}
# Helper functions for text extraction
async def _extract_word_text(file_path: str, extension: str, preserve_formatting: bool, method: str) -> dict[str, Any]:
"""Extract text from Word documents with fallback methods."""
methods_tried = []
# Method selection
if method == "auto":
if extension == ".docx":
method_order = ["python-docx", "mammoth", "docx2txt"]
else: # .doc
method_order = ["olefile", "mammoth", "docx2txt"]
elif method == "primary":
method_order = ["python-docx"] if extension == ".docx" else ["olefile"]
else: # fallback
method_order = ["mammoth", "docx2txt"]
text = ""
formatted_sections = []
method_used = None
for method_name in method_order:
try:
methods_tried.append(method_name)
if method_name == "python-docx" and extension == ".docx":
import docx
doc = docx.Document(file_path)
paragraphs = []
for para in doc.paragraphs:
paragraphs.append(para.text)
if preserve_formatting:
formatted_sections.append({
"type": "paragraph",
"text": para.text,
"style": para.style.name if para.style else None
})
text = "\n".join(paragraphs)
method_used = "python-docx"
break
elif method_name == "mammoth":
import mammoth
with open(file_path, "rb") as docx_file:
if preserve_formatting:
result = mammoth.convert_to_html(docx_file)
text = result.value
formatted_sections.append({
"type": "html",
"content": result.value
})
else:
result = mammoth.extract_raw_text(docx_file)
text = result.value
method_used = "mammoth"
break
elif method_name == "docx2txt":
import docx2txt
text = docx2txt.process(file_path)
method_used = "docx2txt"
break
elif method_name == "olefile" and extension == ".doc":
# Basic text extraction for legacy .doc files
try:
import olefile
if olefile.isOleFile(file_path):
# This is a simplified approach - real .doc parsing is complex
with open(file_path, 'rb') as f:
content = f.read()
# Very basic text extraction attempt
text = content.decode('utf-8', errors='ignore')
# Clean up binary artifacts
import re
text = re.sub(r'[^\x20-\x7E\n\r\t]', '', text)
text = '\n'.join(line.strip() for line in text.split('\n') if line.strip())
method_used = "olefile"
break
except Exception:
continue
except ImportError:
continue
except Exception:
continue
if not method_used:
raise OfficeFileError(f"Failed to extract text using methods: {', '.join(methods_tried)}")
return {
"text": text,
"method_used": method_used,
"methods_tried": methods_tried,
"formatted_sections": formatted_sections
}
async def _extract_excel_text(file_path: str, extension: str, preserve_formatting: bool, method: str) -> dict[str, Any]:
"""Extract text from Excel documents."""
methods_tried = []
if extension == ".csv":
# CSV handling
import pandas as pd
try:
df = pd.read_csv(file_path)
text = df.to_string()
return {
"text": text,
"method_used": "pandas",
"methods_tried": ["pandas"],
"formatted_sections": [{"type": "table", "data": df.to_dict()}] if preserve_formatting else []
}
except Exception as e:
raise OfficeFileError(f"CSV processing failed: {str(e)}")
# Excel file handling
text = ""
formatted_sections = []
method_used = None
method_order = ["openpyxl", "pandas", "xlrd"] if extension == ".xlsx" else ["xlrd", "pandas", "openpyxl"]
for method_name in method_order:
try:
methods_tried.append(method_name)
if method_name == "openpyxl" and extension in [".xlsx", ".xlsm"]:
import openpyxl
wb = openpyxl.load_workbook(file_path, data_only=True)
text_parts = []
for sheet_name in wb.sheetnames:
ws = wb[sheet_name]
text_parts.append(f"Sheet: {sheet_name}")
for row in ws.iter_rows(values_only=True):
row_text = "\t".join(str(cell) if cell is not None else "" for cell in row)
if row_text.strip():
text_parts.append(row_text)
if preserve_formatting:
formatted_sections.append({
"type": "worksheet",
"name": sheet_name,
"data": [[str(cell.value) if cell.value is not None else "" for cell in row] for row in ws.iter_rows()]
})
text = "\n".join(text_parts)
method_used = "openpyxl"
break
elif method_name == "pandas":
import pandas as pd
if extension in [".xlsx", ".xlsm"]:
dfs = pd.read_excel(file_path, sheet_name=None)
else: # .xls
dfs = pd.read_excel(file_path, sheet_name=None, engine='xlrd')
text_parts = []
for sheet_name, df in dfs.items():
text_parts.append(f"Sheet: {sheet_name}")
text_parts.append(df.to_string())
if preserve_formatting:
formatted_sections.append({
"type": "dataframe",
"name": sheet_name,
"data": df.to_dict()
})
text = "\n\n".join(text_parts)
method_used = "pandas"
break
elif method_name == "xlrd" and extension == ".xls":
import xlrd
wb = xlrd.open_workbook(file_path)
text_parts = []
for sheet in wb.sheets():
text_parts.append(f"Sheet: {sheet.name}")
for row_idx in range(sheet.nrows):
row = sheet.row_values(row_idx)
row_text = "\t".join(str(cell) for cell in row)
text_parts.append(row_text)
text = "\n".join(text_parts)
method_used = "xlrd"
break
except ImportError:
continue
except Exception:
continue
if not method_used:
raise OfficeFileError(f"Failed to extract text using methods: {', '.join(methods_tried)}")
return {
"text": text,
"method_used": method_used,
"methods_tried": methods_tried,
"formatted_sections": formatted_sections
}
async def _extract_powerpoint_text(file_path: str, extension: str, preserve_formatting: bool, method: str) -> dict[str, Any]:
"""Extract text from PowerPoint documents."""
methods_tried = []
if extension == ".pptx":
try:
import pptx
prs = pptx.Presentation(file_path)
text_parts = []
formatted_sections = []
for slide_num, slide in enumerate(prs.slides, 1):
slide_text_parts = []
for shape in slide.shapes:
if hasattr(shape, "text") and shape.text:
slide_text_parts.append(shape.text)
slide_text = "\n".join(slide_text_parts)
text_parts.append(f"Slide {slide_num}:\n{slide_text}")
if preserve_formatting:
formatted_sections.append({
"type": "slide",
"number": slide_num,
"text": slide_text,
"shapes": len(slide.shapes)
})
text = "\n\n".join(text_parts)
return {
"text": text,
"method_used": "python-pptx",
"methods_tried": ["python-pptx"],
"formatted_sections": formatted_sections
}
except ImportError:
methods_tried.append("python-pptx")
except Exception:
methods_tried.append("python-pptx")
# Legacy .ppt handling would require additional libraries
if extension == ".ppt":
raise OfficeFileError("Legacy PowerPoint (.ppt) text extraction requires additional setup")
raise OfficeFileError(f"Failed to extract text using methods: {', '.join(methods_tried)}")
# Helper functions for image extraction
async def _extract_word_images(file_path: str, extension: str, output_format: str, min_width: int, min_height: int) -> list[dict[str, Any]]:
"""Extract images from Word documents."""
images = []
if extension == ".docx":
try:
import io
import zipfile
from PIL import Image
with zipfile.ZipFile(file_path, 'r') as zip_file:
# Look for images in media folder
image_files = [f for f in zip_file.namelist() if f.startswith('word/media/')]
for i, img_path in enumerate(image_files):
try:
img_data = zip_file.read(img_path)
img = Image.open(io.BytesIO(img_data))
# Size filtering
if img.width >= min_width and img.height >= min_height:
# Save to temp file
temp_path = os.path.join(TEMP_DIR, f"word_image_{i}.{output_format}")
img.save(temp_path, format=output_format.upper())
images.append({
"index": i,
"filename": os.path.basename(img_path),
"path": temp_path,
"width": img.width,
"height": img.height,
"format": img.format,
"size_bytes": len(img_data)
})
except Exception:
continue
except Exception as e:
raise OfficeFileError(f"Word image extraction failed: {str(e)}")
return images
async def _extract_excel_images(file_path: str, extension: str, output_format: str, min_width: int, min_height: int) -> list[dict[str, Any]]:
"""Extract images from Excel documents."""
images = []
if extension in [".xlsx", ".xlsm"]:
try:
import io
import zipfile
from PIL import Image
with zipfile.ZipFile(file_path, 'r') as zip_file:
# Look for images in media folder
image_files = [f for f in zip_file.namelist() if f.startswith('xl/media/')]
for i, img_path in enumerate(image_files):
try:
img_data = zip_file.read(img_path)
img = Image.open(io.BytesIO(img_data))
# Size filtering
if img.width >= min_width and img.height >= min_height:
# Save to temp file
temp_path = os.path.join(TEMP_DIR, f"excel_image_{i}.{output_format}")
img.save(temp_path, format=output_format.upper())
images.append({
"index": i,
"filename": os.path.basename(img_path),
"path": temp_path,
"width": img.width,
"height": img.height,
"format": img.format,
"size_bytes": len(img_data)
})
except Exception:
continue
except Exception as e:
raise OfficeFileError(f"Excel image extraction failed: {str(e)}")
return images
async def _extract_powerpoint_images(file_path: str, extension: str, output_format: str, min_width: int, min_height: int) -> list[dict[str, Any]]:
"""Extract images from PowerPoint documents."""
images = []
if extension == ".pptx":
try:
import io
import zipfile
from PIL import Image
with zipfile.ZipFile(file_path, 'r') as zip_file:
# Look for images in media folder
image_files = [f for f in zip_file.namelist() if f.startswith('ppt/media/')]
for i, img_path in enumerate(image_files):
try:
img_data = zip_file.read(img_path)
img = Image.open(io.BytesIO(img_data))
# Size filtering
if img.width >= min_width and img.height >= min_height:
# Save to temp file
temp_path = os.path.join(TEMP_DIR, f"powerpoint_image_{i}.{output_format}")
img.save(temp_path, format=output_format.upper())
images.append({
"index": i,
"filename": os.path.basename(img_path),
"path": temp_path,
"width": img.width,
"height": img.height,
"format": img.format,
"size_bytes": len(img_data)
})
except Exception:
continue
except Exception as e:
raise OfficeFileError(f"PowerPoint image extraction failed: {str(e)}")
return images
# Helper functions for metadata extraction
async def _extract_basic_metadata(file_path: str, extension: str, category: str) -> dict[str, Any]:
"""Extract basic metadata from Office documents."""
metadata = {"category": category, "extension": extension}
try:
if extension in [".docx", ".xlsx", ".pptx"] and category in ["word", "excel", "powerpoint"]:
import zipfile
with zipfile.ZipFile(file_path, 'r') as zip_file:
# Core properties
if 'docProps/core.xml' in zip_file.namelist():
zip_file.read('docProps/core.xml').decode('utf-8')
metadata["has_core_properties"] = True
# App properties
if 'docProps/app.xml' in zip_file.namelist():
zip_file.read('docProps/app.xml').decode('utf-8')
metadata["has_app_properties"] = True
except Exception:
pass
return metadata
async def _extract_word_metadata(file_path: str, extension: str) -> dict[str, Any]:
"""Extract Word-specific metadata."""
metadata = {"type": "word", "extension": extension}
if extension == ".docx":
try:
import docx
doc = docx.Document(file_path)
core_props = doc.core_properties
metadata.update({
"title": core_props.title,
"author": core_props.author,
"subject": core_props.subject,
"keywords": core_props.keywords,
"comments": core_props.comments,
"created": str(core_props.created) if core_props.created else None,
"modified": str(core_props.modified) if core_props.modified else None
})
# Document structure
metadata.update({
"paragraph_count": len(doc.paragraphs),
"section_count": len(doc.sections),
"has_tables": len(doc.tables) > 0,
"table_count": len(doc.tables)
})
except Exception:
pass
return metadata
async def _extract_excel_metadata(file_path: str, extension: str) -> dict[str, Any]:
"""Extract Excel-specific metadata."""
metadata = {"type": "excel", "extension": extension}
if extension in [".xlsx", ".xlsm"]:
try:
import openpyxl
wb = openpyxl.load_workbook(file_path)
props = wb.properties
metadata.update({
"title": props.title,
"creator": props.creator,
"subject": props.subject,
"description": props.description,
"keywords": props.keywords,
"created": str(props.created) if props.created else None,
"modified": str(props.modified) if props.modified else None
})
# Workbook structure
metadata.update({
"worksheet_count": len(wb.worksheets),
"worksheet_names": wb.sheetnames,
"has_charts": any(len(ws._charts) > 0 for ws in wb.worksheets),
"has_images": any(len(ws._images) > 0 for ws in wb.worksheets)
})
except Exception:
pass
return metadata
async def _extract_powerpoint_metadata(file_path: str, extension: str) -> dict[str, Any]:
"""Extract PowerPoint-specific metadata."""
metadata = {"type": "powerpoint", "extension": extension}
if extension == ".pptx":
try:
import pptx
prs = pptx.Presentation(file_path)
core_props = prs.core_properties
metadata.update({
"title": core_props.title,
"author": core_props.author,
"subject": core_props.subject,
"keywords": core_props.keywords,
"comments": core_props.comments,
"created": str(core_props.created) if core_props.created else None,
"modified": str(core_props.modified) if core_props.modified else None
})
# Presentation structure
slide_layouts = set()
total_shapes = 0
for slide in prs.slides:
slide_layouts.add(slide.slide_layout.name)
total_shapes += len(slide.shapes)
metadata.update({
"slide_count": len(prs.slides),
"slide_layouts": list(slide_layouts),
"total_shapes": total_shapes,
"slide_width": prs.slide_width,
"slide_height": prs.slide_height
})
except Exception:
pass
return metadata
def _calculate_health_score(validation: dict[str, Any], format_info: dict[str, Any]) -> int:
"""Calculate document health score (1-10)."""
score = 10
# Deduct for validation errors
if not validation["is_valid"]:
score -= 5
if validation["errors"]:
score -= len(validation["errors"]) * 2
if validation["warnings"]:
score -= len(validation["warnings"])
# Deduct for problematic characteristics
if validation.get("password_protected"):
score -= 1
if format_info.get("is_legacy"):
score -= 1
structure = format_info.get("structure", {})
if structure.get("estimated_complexity") == "complex":
score -= 1
return max(1, min(10, score))
def _get_health_recommendations(validation: dict[str, Any], format_info: dict[str, Any]) -> list[str]:
"""Get health improvement recommendations."""
recommendations = []
if validation["errors"]:
recommendations.append("Fix validation errors before processing")
if validation.get("password_protected"):
recommendations.append("Remove password protection if possible")
if format_info.get("is_legacy"):
recommendations.append("Consider converting to modern format (.docx, .xlsx, .pptx)")
structure = format_info.get("structure", {})
if structure.get("estimated_complexity") == "complex":
recommendations.append("Complex document may require specialized processing")
if not recommendations:
recommendations.append("Document appears healthy and ready for processing")
return recommendations
# Markdown conversion helper functions
async def _convert_docx_to_markdown(
file_path: str,
include_images: bool,
image_mode: str,
max_image_size: int,
preserve_structure: bool,
page_numbers: list[int],
summary_only: bool,
output_dir: str
) -> dict[str, Any]:
"""Convert .docx file to markdown with comprehensive feature support."""
import base64
# ULTRA-FAST summary mode - skip all complex processing
if summary_only:
return await _get_ultra_fast_summary(file_path)
# If page_numbers is specified, we need to use python-docx for page-based extraction
# as mammoth processes the entire document
if page_numbers:
return await _convert_docx_with_python_docx(
file_path, include_images, image_mode, max_image_size,
preserve_structure, page_numbers, summary_only, output_dir
)
try:
# Try mammoth first for better HTML->Markdown conversion (full document only)
import mammoth
# Configure mammoth for markdown-friendly output
with open(file_path, "rb") as docx_file:
if include_images:
# Extract images and handle them based on mode
images_info = []
def convert_image(image):
image_data = image.open()
content_type = image.content_type
ext = content_type.split('/')[-1] if '/' in content_type else 'png'
if image_mode == "base64":
if len(image_data) <= max_image_size:
encoded = base64.b64encode(image_data).decode('utf-8')
images_info.append({
"filename": f"image_{len(images_info)}.{ext}",
"content_type": content_type,
"size_bytes": len(image_data),
"mode": "base64"
})
return {
"src": f"data:{content_type};base64,{encoded}"
}
else:
# Too large for base64, fall back to reference
filename = f"large_image_{len(images_info)}.{ext}"
images_info.append({
"filename": filename,
"content_type": content_type,
"size_bytes": len(image_data),
"mode": "reference",
"note": "Too large for base64 encoding"
})
return {"src": filename}
elif image_mode == "files":
# Save image to file
nonlocal output_dir
if not output_dir:
output_dir = os.path.join(TEMP_DIR, "markdown_images")
os.makedirs(output_dir, exist_ok=True)
filename = f"image_{len(images_info)}.{ext}"
file_path = os.path.join(output_dir, filename)
with open(file_path, 'wb') as img_file:
img_file.write(image_data)
images_info.append({
"filename": filename,
"file_path": file_path,
"content_type": content_type,
"size_bytes": len(image_data),
"mode": "file"
})
return {"src": file_path}
else: # references
filename = f"image_{len(images_info)}.{ext}"
images_info.append({
"filename": filename,
"content_type": content_type,
"size_bytes": len(image_data),
"mode": "reference"
})
return {"src": filename}
# Convert with image handling
result = mammoth.convert_to_html(
docx_file,
convert_image=mammoth.images.img_element(convert_image)
)
html_content = result.value
markdown_content = _html_to_markdown(html_content, preserve_structure)
conversion_result = {
"content": markdown_content,
"method_used": "mammoth-with-images",
"images": images_info
}
else:
# Convert without images
result = mammoth.convert_to_markdown(docx_file)
markdown_content = result.value
conversion_result = {
"content": markdown_content,
"method_used": "mammoth-markdown",
"images": []
}
# Handle summary mode
if summary_only and len(markdown_content) > 5000:
# For summary mode, truncate large content
markdown_content = markdown_content[:5000] + "\n\n[Content truncated - use summary_only=false for full content]"
# Update the conversion result
conversion_result["content"] = markdown_content
# Extract structure information
if preserve_structure:
structure = _extract_markdown_structure(markdown_content)
conversion_result["structure"] = structure
return conversion_result
except ImportError:
# Fall back to python-docx with custom markdown conversion
return await _convert_docx_with_python_docx(
file_path, include_images, image_mode, max_image_size,
preserve_structure, page_numbers, summary_only, output_dir
)
except Exception:
# Fall back to python-docx
return await _convert_docx_with_python_docx(
file_path, include_images, image_mode, max_image_size,
preserve_structure, page_numbers, summary_only, output_dir
)
async def _convert_docx_with_python_docx(
file_path: str,
include_images: bool,
image_mode: str,
max_image_size: int,
preserve_structure: bool,
page_numbers: list[int],
summary_only: bool,
output_dir: str
) -> dict[str, Any]:
"""Convert .docx using python-docx with custom markdown conversion."""
import base64
import docx
from docx.oxml.table import CT_Tbl
from docx.oxml.text.paragraph import CT_P
from docx.table import Table
from docx.text.paragraph import Paragraph
doc = docx.Document(file_path)
markdown_parts = []
images_info = []
structure_info = {"headings": [], "tables": 0, "lists": 0, "paragraphs": 0}
# Extract images if requested
if include_images:
extracted_images = await _extract_word_images(file_path, ".docx", "png", 1, 1)
for i, img in enumerate(extracted_images):
if image_mode == "base64":
if img.get("size_bytes", 0) <= max_image_size:
with open(img["path"], "rb") as img_file:
img_data = img_file.read()
encoded = base64.b64encode(img_data).decode('utf-8')
images_info.append({
"filename": img["filename"],
"content_type": f"image/{img.get('format', 'png').lower()}",
"size_bytes": img.get("size_bytes", 0),
"mode": "base64",
"markdown_ref": f"![Image {i+1}](data:image/{img.get('format', 'png').lower()};base64,{encoded})"
})
else:
images_info.append({
"filename": img["filename"],
"size_bytes": img.get("size_bytes", 0),
"mode": "reference",
"markdown_ref": f"![Image {i+1}]({img['filename']})",
"note": "Too large for base64 encoding"
})
elif image_mode == "files":
images_info.append({
"filename": img["filename"],
"file_path": img["path"],
"size_bytes": img.get("size_bytes", 0),
"mode": "file",
"markdown_ref": f"![Image {i+1}]({img['path']})"
})
else: # references
images_info.append({
"filename": img["filename"],
"size_bytes": img.get("size_bytes", 0),
"mode": "reference",
"markdown_ref": f"![Image {i+1}]({img['filename']})"
})
# Process document elements with aggressive content limiting
# Since Word page detection is unreliable, use element-based limiting
if page_numbers:
# For page ranges, severely limit content extraction
max_pages_requested = max(page_numbers) if page_numbers else 1
# Rough estimate: ~20-30 paragraphs per page
max_paragraphs = min(max_pages_requested * 25, 100) # Cap at 100 paragraphs max
max_chars = min(max_pages_requested * 8000, 40000) # Cap at 40k chars max
else:
max_paragraphs = 1000 # Large limit for full document
max_chars = 200000
current_page = 1
processed_paragraphs = 0
total_chars = 0
include_current_page = not page_numbers or current_page in page_numbers
table_of_contents = [] # Track headings with page numbers for TOC
for element in doc.element.body:
# Early termination if we've processed enough content
if processed_paragraphs >= max_paragraphs or total_chars >= max_chars:
break
if isinstance(element, CT_P):
paragraph = Paragraph(element, doc)
# Check for page breaks
if _has_page_break(paragraph):
current_page += 1
include_current_page = not page_numbers or current_page in page_numbers
continue
# Process content with strict limits
markdown_text = _paragraph_to_markdown(paragraph, preserve_structure)
if markdown_text.strip():
# Check if adding this would exceed limits
text_length = len(markdown_text)
if total_chars + text_length > max_chars:
break # Stop processing
markdown_parts.append(markdown_text)
processed_paragraphs += 1
total_chars += text_length
structure_info["paragraphs"] += 1
# Track headings for both structure and TOC
if preserve_structure and markdown_text.startswith('#'):
level = len(markdown_text) - len(markdown_text.lstrip('#'))
heading_text = markdown_text.lstrip('# ').strip()
heading_info = {
"level": level,
"text": heading_text,
"position": len(markdown_parts) - 1,
"page": current_page
}
structure_info["headings"].append(heading_info)
# Add to table of contents
table_of_contents.append({
"level": level,
"title": heading_text,
"page": current_page,
"suggested_page_range": f"{current_page}-{current_page + _estimate_section_length(level)}"
})
elif isinstance(element, CT_Tbl):
# Process tables with strict limits
if processed_paragraphs < max_paragraphs and total_chars < max_chars:
table = Table(element, doc)
table_markdown = _table_to_markdown(table)
if table_markdown.strip():
table_length = len(table_markdown)
if total_chars + table_length > max_chars:
break # Stop processing
markdown_parts.append(table_markdown)
total_chars += table_length
structure_info["tables"] += 1
# Add image references at the end if any
if include_images and images_info:
markdown_parts.append("\n## Images\n")
for img in images_info:
markdown_parts.append(img["markdown_ref"])
markdown_content = "\n\n".join(markdown_parts)
result = {
"content": markdown_content,
"method_used": "python-docx-custom",
"images": images_info
}
# Add table of contents for navigation
if table_of_contents:
result["table_of_contents"] = _optimize_toc_page_ranges(table_of_contents)
# Add processing limits info
result["processing_limits"] = {
"max_paragraphs_allowed": max_paragraphs,
"max_chars_allowed": max_chars,
"paragraphs_processed": processed_paragraphs,
"chars_processed": total_chars,
"content_truncated": processed_paragraphs >= max_paragraphs or total_chars >= max_chars,
"note": f"Processed {processed_paragraphs}/{max_paragraphs} paragraphs, {total_chars:,}/{max_chars:,} chars"
}
# Add page filtering info
if page_numbers:
result["pages_processed"] = page_numbers
result["total_pages_in_range"] = len(page_numbers)
# Handle summary mode
if summary_only and len(markdown_content) > 5000:
markdown_content = markdown_content[:5000] + "\n\n[Content truncated - use summary_only=false for full content]"
# Update the result content
result["content"] = markdown_content
# Add structure info
if preserve_structure:
result["structure"] = structure_info
return result
async def _convert_doc_to_markdown(
file_path: str,
include_images: bool,
image_mode: str,
max_image_size: int,
preserve_structure: bool,
page_numbers: list[int],
summary_only: bool,
output_dir: str
) -> dict[str, Any]:
"""Convert legacy .doc file to markdown using available methods."""
try:
import mammoth
with open(file_path, "rb") as doc_file:
result = mammoth.convert_to_markdown(doc_file)
markdown_content = result.value
conversion_result = {
"content": markdown_content,
"method_used": "mammoth-doc",
"images": [] # Legacy .doc image extraction is complex
}
# Handle summary mode
if summary_only and len(markdown_content) > 5000:
markdown_content = markdown_content[:5000] + "\n\n[Content truncated - use summary_only=false for full content]"
# Update the conversion result
conversion_result["content"] = markdown_content
if preserve_structure:
structure = _extract_markdown_structure(markdown_content)
conversion_result["structure"] = structure
return conversion_result
except ImportError:
raise OfficeFileError("Legacy .doc conversion requires mammoth library")
except Exception as e:
raise OfficeFileError(f"Legacy .doc conversion failed: {str(e)}")
def _paragraph_to_markdown(paragraph, preserve_structure: bool) -> str:
"""Convert a Word paragraph to markdown format."""
text = paragraph.text.strip()
if not text:
return ""
if not preserve_structure:
return text
# Handle different paragraph styles
style_name = paragraph.style.name.lower() if paragraph.style else ""
if "heading" in style_name:
# Extract heading level from style name
import re
level_match = re.search(r'(\d+)', style_name)
level = int(level_match.group(1)) if level_match else 1
return f"{'#' * level} {text}"
elif "title" in style_name:
return f"# {text}"
elif "subtitle" in style_name:
return f"## {text}"
elif style_name in ["list paragraph", "list"]:
return f"- {text}"
elif "quote" in style_name:
return f"> {text}"
else:
return text
def _table_to_markdown(table) -> str:
"""Convert a Word table to markdown format."""
markdown_rows = []
for i, row in enumerate(table.rows):
cells = [cell.text.strip().replace('\n', ' ') for cell in row.cells]
markdown_row = "| " + " | ".join(cells) + " |"
markdown_rows.append(markdown_row)
# Add header separator after first row
if i == 0:
separator = "| " + " | ".join(["---"] * len(cells)) + " |"
markdown_rows.append(separator)
return "\n".join(markdown_rows)
def _html_to_markdown(html_content: str, preserve_structure: bool) -> str:
"""Convert HTML content to markdown format."""
import re
# Basic HTML to Markdown conversions
conversions = [
(r'<h1[^>]*>(.*?)</h1>', r'# \1'),
(r'<h2[^>]*>(.*?)</h2>', r'## \1'),
(r'<h3[^>]*>(.*?)</h3>', r'### \1'),
(r'<h4[^>]*>(.*?)</h4>', r'#### \1'),
(r'<h5[^>]*>(.*?)</h5>', r'##### \1'),
(r'<h6[^>]*>(.*?)</h6>', r'###### \1'),
(r'<strong[^>]*>(.*?)</strong>', r'**\1**'),
(r'<b[^>]*>(.*?)</b>', r'**\1**'),
(r'<em[^>]*>(.*?)</em>', r'*\1*'),
(r'<i[^>]*>(.*?)</i>', r'*\1*'),
(r'<code[^>]*>(.*?)</code>', r'`\1`'),
(r'<a[^>]*href="([^"]*)"[^>]*>(.*?)</a>', r'[\2](\1)'),
(r'<img[^>]*src="([^"]*)"[^>]*/?>', r'![](\1)'),
(r'<p[^>]*>(.*?)</p>', r'\1\n'),
(r'<br[^>]*/?>', r'\n'),
(r'<li[^>]*>(.*?)</li>', r'- \1'),
(r'<ul[^>]*>(.*?)</ul>', r'\1'),
(r'<ol[^>]*>(.*?)</ol>', r'\1'),
(r'<blockquote[^>]*>(.*?)</blockquote>', r'> \1'),
]
markdown = html_content
for pattern, replacement in conversions:
markdown = re.sub(pattern, replacement, markdown, flags=re.DOTALL | re.IGNORECASE)
# Clean up extra whitespace
markdown = re.sub(r'\n\s*\n\s*\n', '\n\n', markdown)
markdown = re.sub(r'^\s+|\s+$', '', markdown, flags=re.MULTILINE)
return markdown
def _chunk_markdown(content: str, chunk_size: int) -> list[dict[str, Any]]:
"""Split markdown content into chunks while preserving structure."""
chunks = []
lines = content.split('\n')
current_chunk = []
current_size = 0
chunk_num = 1
for line in lines:
line_size = len(line) + 1 # +1 for newline
# If adding this line would exceed chunk size and we have content
if current_size + line_size > chunk_size and current_chunk:
chunks.append({
"chunk_number": chunk_num,
"content": '\n'.join(current_chunk),
"character_count": current_size,
"line_count": len(current_chunk)
})
current_chunk = []
current_size = 0
chunk_num += 1
current_chunk.append(line)
current_size += line_size
# Add final chunk if there's remaining content
if current_chunk:
chunks.append({
"chunk_number": chunk_num,
"content": '\n'.join(current_chunk),
"character_count": current_size,
"line_count": len(current_chunk)
})
return chunks
def _extract_markdown_structure(content: str) -> dict[str, Any]:
"""Extract structure information from markdown content."""
import re
structure = {
"headings": [],
"lists": 0,
"links": 0,
"images": 0,
"code_blocks": 0,
"tables": 0,
"line_count": len(content.split('\n'))
}
lines = content.split('\n')
for i, line in enumerate(lines):
# Find headings
heading_match = re.match(r'^(#{1,6})\s+(.+)', line)
if heading_match:
level = len(heading_match.group(1))
text = heading_match.group(2).strip()
structure["headings"].append({
"level": level,
"text": text,
"line_number": i + 1
})
# Count other elements
if re.match(r'^[-*+]\s+', line):
structure["lists"] += 1
structure["links"] += len(re.findall(r'\[([^\]]+)\]\([^)]+\)', line))
structure["images"] += len(re.findall(r'!\[([^\]]*)\]\([^)]+\)', line))
if line.strip().startswith('```'):
structure["code_blocks"] += 1
if '|' in line and line.count('|') >= 2:
structure["tables"] += 1
return structure
async def _get_ultra_fast_summary(file_path: str) -> dict[str, Any]:
"""Ultra-fast summary that extracts minimal data to prevent MCP token limits."""
try:
import docx
doc = docx.Document(file_path)
# Extract only the first few paragraphs and major headings
content_parts = []
heading_count = 0
paragraph_count = 0
max_content_length = 2000 # Very short limit
current_length = 0
# Get basic structure info quickly
total_paragraphs = len(doc.paragraphs)
total_tables = len(doc.tables)
# Extract bookmarks (chapter markers)
bookmarks = []
try:
# Access document's bookmarks through the XML
for bookmark in doc.element.xpath('//w:bookmarkStart', namespaces={'w': 'http://schemas.openxmlformats.org/wordprocessingml/2006/main'}):
bookmark_name = bookmark.get('{http://schemas.openxmlformats.org/wordprocessingml/2006/main}name')
if bookmark_name and not bookmark_name.startswith('_'): # Skip system bookmarks
bookmarks.append(bookmark_name)
except Exception:
pass # Bookmarks extraction failed, continue without
# Extract just a few key headings and the start of content
for para in doc.paragraphs[:50]: # Only check first 50 paragraphs
text = para.text.strip()
if not text:
continue
# Check if it's a heading (simple heuristic)
is_heading = (para.style and "heading" in para.style.name.lower()) or len(text) < 100
if is_heading and heading_count < 10: # Max 10 headings
content_parts.append(f"# {text}")
heading_count += 1
current_length += len(text) + 3
elif paragraph_count < 5 and current_length < max_content_length: # Max 5 paragraphs
content_parts.append(text)
paragraph_count += 1
current_length += len(text)
if current_length > max_content_length:
break
# Create very basic summary
summary_content = "\n\n".join(content_parts)
return {
"content": summary_content,
"method_used": "ultra-fast-summary",
"table_of_contents": {
"note": "Use full document processing for detailed TOC",
"basic_info": f"Document has ~{total_paragraphs} paragraphs, {total_tables} tables, {heading_count} headings found in first scan",
"bookmarks": bookmarks[:20] if bookmarks else [], # Limit to first 20 bookmarks
"bookmark_count": len(bookmarks),
"bookmark_note": "Bookmarks often indicate chapter starts. Use these as navigation hints for page_range extraction."
}
}
except Exception as e:
return {
"content": f"Error creating summary: {str(e)}",
"method_used": "error-fallback",
"table_of_contents": {"note": "Summary generation failed"}
}
def _smart_truncate_content(content: str, max_chars: int) -> str:
"""Intelligently truncate content while preserving structure and readability."""
if len(content) <= max_chars:
return content
lines = content.split('\n')
truncated_lines = []
current_length = 0
# Try to preserve structure by stopping at a natural break point
for line in lines:
line_length = len(line) + 1 # +1 for newline
# If adding this line would exceed limit
if current_length + line_length > max_chars:
# Try to find a good stopping point
if truncated_lines:
# Check if we're in the middle of a section
last_lines = '\n'.join(truncated_lines[-3:]) if len(truncated_lines) >= 3 else '\n'.join(truncated_lines)
# If we stopped mid-paragraph, remove incomplete paragraph
if not (line.strip() == '' or line.startswith('#') or line.startswith('|')):
# Remove lines until we hit a natural break
while truncated_lines and not (
truncated_lines[-1].strip() == '' or
truncated_lines[-1].startswith('#') or
truncated_lines[-1].startswith('|') or
truncated_lines[-1].startswith('-') or
truncated_lines[-1].startswith('*')
):
truncated_lines.pop()
break
truncated_lines.append(line)
current_length += line_length
# Add truncation notice
result = '\n'.join(truncated_lines)
result += f"\n\n---\n**[CONTENT TRUNCATED]**\nShowing {len(result):,} of {len(content):,} characters.\nUse smaller page ranges (e.g., 3-5 pages) for full content without truncation.\n---"
return result
def _estimate_section_length(heading_level: int) -> int:
"""Estimate how many pages a section might span based on heading level."""
# Higher level headings (H1) tend to have longer sections
if heading_level == 1: # Major chapters
return 8
elif heading_level == 2: # Major sections
return 4
elif heading_level == 3: # Subsections
return 2
else: # Minor headings
return 1
def _optimize_toc_page_ranges(toc_entries: list) -> dict[str, Any]:
"""Optimize table of contents page ranges based on actual heading positions."""
optimized_toc = {
"sections": [],
"total_sections": len(toc_entries),
"suggested_chunking": []
}
for i, entry in enumerate(toc_entries):
# Calculate actual end page based on next heading or document end
if i + 1 < len(toc_entries):
next_page = toc_entries[i + 1]["page"]
actual_end_page = max(entry["page"], next_page - 1)
else:
# Last section - use estimated length
actual_end_page = entry["page"] + _estimate_section_length(entry["level"])
optimized_entry = {
"level": entry["level"],
"title": entry["title"],
"start_page": entry["page"],
"estimated_end_page": actual_end_page,
"suggested_page_range": f"{entry['page']}-{actual_end_page}",
"section_type": _classify_section_type(entry["level"], entry["title"])
}
optimized_toc["sections"].append(optimized_entry)
# Generate chunking suggestions
optimized_toc["suggested_chunking"] = _generate_chunking_suggestions(optimized_toc["sections"])
return optimized_toc
def _classify_section_type(level: int, title: str) -> str:
"""Classify section type based on level and title patterns."""
title_lower = title.lower()
if level == 1:
if any(word in title_lower for word in ["chapter", "part", "section"]):
return "chapter"
elif any(word in title_lower for word in ["introduction", "conclusion", "summary"]):
return "special_section"
else:
return "major_section"
elif level == 2:
return "section"
elif level == 3:
return "subsection"
else:
return "minor_heading"
def _generate_chunking_suggestions(sections: list) -> list[dict[str, Any]]:
"""Generate smart chunking suggestions based on document structure."""
suggestions = []
current_chunk_pages = 0
chunk_start = 1
chunk_sections = []
for section in sections:
section_pages = section["estimated_end_page"] - section["start_page"] + 1
# If adding this section would make chunk too large, finalize current chunk
# Use smaller chunks (8 pages) to prevent MCP token limit issues
if current_chunk_pages + section_pages > 8 and chunk_sections:
suggestions.append({
"chunk_number": len(suggestions) + 1,
"page_range": f"{chunk_start}-{chunk_sections[-1]['estimated_end_page']}",
"sections_included": [s["title"] for s in chunk_sections],
"estimated_pages": current_chunk_pages,
"description": f"Chunk {len(suggestions) + 1}: {chunk_sections[0]['title']}" +
(f" + {len(chunk_sections)-1} more sections" if len(chunk_sections) > 1 else "")
})
# Start new chunk
chunk_start = section["start_page"]
current_chunk_pages = section_pages
chunk_sections = [section]
else:
# Add to current chunk
current_chunk_pages += section_pages
chunk_sections.append(section)
# Add final chunk if any sections remain
if chunk_sections:
suggestions.append({
"chunk_number": len(suggestions) + 1,
"page_range": f"{chunk_start}-{chunk_sections[-1]['estimated_end_page']}",
"sections_included": [s["title"] for s in chunk_sections],
"estimated_pages": current_chunk_pages,
"description": f"Chunk {len(suggestions) + 1}: {chunk_sections[0]['title']}" +
(f" + {len(chunk_sections)-1} more sections" if len(chunk_sections) > 1 else "")
})
return suggestions
def _has_page_break(paragraph) -> bool:
"""Check if a paragraph contains a page break."""
try:
# Check for explicit page breaks in paragraph runs
for run in paragraph.runs:
if run._r.find('.//{http://schemas.openxmlformats.org/wordprocessingml/2006/main}br') is not None:
br_elem = run._r.find('.//{http://schemas.openxmlformats.org/wordprocessingml/2006/main}br')
if br_elem is not None and br_elem.get('{http://schemas.openxmlformats.org/wordprocessingml/2006/main}type') == 'page':
return True
return False
except Exception:
return False
def _parse_page_range(page_range: str) -> list[int]:
"""Parse page range string into list of page numbers.
Examples:
"1-5" -> [1, 2, 3, 4, 5]
"1,3,5" -> [1, 3, 5]
"1-3,5,7-9" -> [1, 2, 3, 5, 7, 8, 9]
"""
pages = set()
for part in page_range.split(','):
part = part.strip()
if '-' in part:
# Handle range like "1-5"
start, end = part.split('-', 1)
try:
start_num = int(start.strip())
end_num = int(end.strip())
pages.update(range(start_num, end_num + 1))
except ValueError:
continue
else:
# Handle single page like "3"
try:
pages.add(int(part))
except ValueError:
continue
return sorted(list(pages))
async def _analyze_document_size(file_path: str, extension: str) -> dict[str, Any]:
"""Analyze document to estimate size and complexity."""
analysis = {
"estimated_pages": 1,
"file_size_mb": 0,
"complexity": "simple",
"estimated_content_size": "small"
}
try:
# Get file size
from pathlib import Path
file_size = Path(file_path).stat().st_size
analysis["file_size_mb"] = round(file_size / (1024 * 1024), 2)
if extension == ".docx":
try:
import docx
doc = docx.Document(file_path)
# Estimate pages based on content
paragraph_count = len(doc.paragraphs)
table_count = len(doc.tables)
# Rough estimation: ~40 paragraphs per page
estimated_pages = max(1, paragraph_count // 40)
analysis["estimated_pages"] = estimated_pages
# Determine complexity
if table_count > 10 or paragraph_count > 500:
analysis["complexity"] = "complex"
elif table_count > 5 or paragraph_count > 200:
analysis["complexity"] = "moderate"
# Estimate content size
if estimated_pages > 20:
analysis["estimated_content_size"] = "very_large"
elif estimated_pages > 10:
analysis["estimated_content_size"] = "large"
elif estimated_pages > 5:
analysis["estimated_content_size"] = "medium"
except Exception:
# Fallback to file size estimation
if file_size > 5 * 1024 * 1024: # 5MB
analysis["estimated_pages"] = 50
analysis["estimated_content_size"] = "very_large"
elif file_size > 1 * 1024 * 1024: # 1MB
analysis["estimated_pages"] = 20
analysis["estimated_content_size"] = "large"
elif file_size > 500 * 1024: # 500KB
analysis["estimated_pages"] = 10
analysis["estimated_content_size"] = "medium"
except Exception:
pass
return analysis
def _get_processing_recommendation(
doc_analysis: dict[str, Any],
page_range: str,
summary_only: bool
) -> dict[str, Any]:
"""Generate intelligent processing recommendations based on document analysis."""
estimated_pages = doc_analysis["estimated_pages"]
content_size = doc_analysis["estimated_content_size"]
recommendation = {
"status": "optimal",
"message": "",
"suggested_workflow": [],
"warnings": []
}
# Large document recommendations
if content_size in ["large", "very_large"] and not page_range and not summary_only:
recommendation["status"] = "suboptimal"
recommendation["message"] = (
f"⚠️ Large document detected ({estimated_pages} estimated pages). "
"Consider using recommended workflow for better performance."
)
recommendation["suggested_workflow"] = [
"1. First: Call with summary_only=true to get document overview and TOC",
"2. Then: Use page_range to process specific sections (e.g., '1-5', '6-10', '15-20')",
"3. Recommended: Use 3-8 page chunks to stay under 25k token MCP limit",
"4. The tool auto-truncates if content is too large, but smaller ranges work better"
]
recommendation["warnings"] = [
"Page ranges >8 pages may hit 25k token response limit and get truncated",
"Use smaller page ranges (3-5 pages) for dense content documents",
"Auto-truncation preserves structure but loses content completeness"
]
# Medium document recommendations
elif content_size == "medium" and not page_range and not summary_only:
recommendation["status"] = "caution"
recommendation["message"] = (
f"Medium document detected ({estimated_pages} estimated pages). "
"Consider summary_only=true first if you encounter response size issues."
)
recommendation["suggested_workflow"] = [
"Option 1: Try full processing (current approach)",
"Option 2: Use summary_only=true first, then page_range if needed"
]
# Optimal usage patterns
elif summary_only:
recommendation["message"] = "✅ Excellent! Using summary mode for initial document analysis."
recommendation["suggested_workflow"] = [
"After reviewing summary, use page_range to extract specific sections of interest"
]
elif page_range and content_size in ["large", "very_large"]:
recommendation["message"] = "✅ Perfect! Using page-range processing for efficient extraction."
elif content_size == "small":
recommendation["message"] = "✅ Small document - full processing is optimal."
return recommendation
def main():
"""Main entry point for the MCP server."""
import sys
if len(sys.argv) > 1 and sys.argv[1] == "--version":
from . import __version__
print(f"MCP Office Tools v{__version__}")
return
# Run the FastMCP server
app.run()
if __name__ == "__main__":
main()