diff --git a/src/mcp_office_tools/mixins/universal.py b/src/mcp_office_tools/mixins/universal.py index 0f6906e..fc01060 100644 --- a/src/mcp_office_tools/mixins/universal.py +++ b/src/mcp_office_tools/mixins/universal.py @@ -293,7 +293,7 @@ class UniversalMixin(MCPMixin): async def _extract_text_by_category(self, file_path: str, extension: str, category: str, preserve_formatting: bool, method: str) -> dict[str, Any]: """Extract text based on document category.""" # Import the appropriate extraction function - from ..server_monolithic import _extract_word_text, _extract_excel_text, _extract_powerpoint_text + from ..utils import _extract_word_text, _extract_excel_text, _extract_powerpoint_text if category == "word": return await _extract_word_text(file_path, extension, preserve_formatting, method) @@ -306,7 +306,7 @@ class UniversalMixin(MCPMixin): async def _extract_images_by_category(self, file_path: str, extension: str, category: str, output_format: str, min_width: int, min_height: int) -> list[dict[str, Any]]: """Extract images based on document category.""" - from ..server_monolithic import _extract_word_images, _extract_excel_images, _extract_powerpoint_images + from ..utils import _extract_word_images, _extract_excel_images, _extract_powerpoint_images if category == "word": return await _extract_word_images(file_path, extension, output_format, min_width, min_height) @@ -319,7 +319,7 @@ class UniversalMixin(MCPMixin): async def _extract_metadata_by_category(self, file_path: str, extension: str, category: str) -> dict[str, Any]: """Extract metadata based on document category.""" - from ..server_monolithic import _extract_word_metadata, _extract_excel_metadata, _extract_powerpoint_metadata, _extract_basic_metadata + from ..utils import _extract_word_metadata, _extract_excel_metadata, _extract_powerpoint_metadata, _extract_basic_metadata # Get basic metadata first metadata = await _extract_basic_metadata(file_path, extension, category) @@ -339,5 +339,5 @@ class UniversalMixin(MCPMixin): async def _extract_basic_metadata(self, file_path: str, extension: str, category: str) -> dict[str, Any]: """Extract basic metadata common to all documents.""" - from ..server_monolithic import _extract_basic_metadata + from ..utils import _extract_basic_metadata return await _extract_basic_metadata(file_path, extension, category) \ No newline at end of file diff --git a/src/mcp_office_tools/mixins/word.py b/src/mcp_office_tools/mixins/word.py index e1ad1ca..c8f4d62 100644 --- a/src/mcp_office_tools/mixins/word.py +++ b/src/mcp_office_tools/mixins/word.py @@ -225,17 +225,17 @@ class WordMixin(MCPMixin): # Helper methods - import from monolithic server async def _analyze_document_size(self, file_path: str, extension: str) -> dict[str, Any]: """Analyze document size for processing recommendations.""" - from ..server_monolithic import _analyze_document_size + from ..utils import _analyze_document_size return await _analyze_document_size(file_path, extension) def _get_processing_recommendation(self, doc_analysis: dict[str, Any], page_range: str, summary_only: bool) -> dict[str, Any]: """Get processing recommendations based on document analysis.""" - from ..server_monolithic import _get_processing_recommendation + from ..utils import _get_processing_recommendation return _get_processing_recommendation(doc_analysis, page_range, summary_only) def _parse_page_range(self, page_range: str) -> list[int]: """Parse page range string into list of page numbers.""" - from ..server_monolithic import _parse_page_range + from ..utils import _parse_page_range return _parse_page_range(page_range) async def _convert_docx_to_markdown( @@ -244,7 +244,7 @@ class WordMixin(MCPMixin): bookmark_name: str = "", chapter_name: str = "" ) -> dict[str, Any]: """Convert .docx to markdown.""" - from ..server_monolithic import _convert_docx_to_markdown + from ..utils import _convert_docx_to_markdown return await _convert_docx_to_markdown( file_path, include_images, image_mode, max_image_size, preserve_structure, page_numbers, summary_only, output_dir, bookmark_name, chapter_name @@ -255,7 +255,7 @@ class WordMixin(MCPMixin): preserve_structure: bool, page_numbers: list[int], summary_only: bool, output_dir: str ) -> dict[str, Any]: """Convert legacy .doc to markdown.""" - from ..server_monolithic import _convert_doc_to_markdown + from ..utils import _convert_doc_to_markdown return await _convert_doc_to_markdown( file_path, include_images, image_mode, max_image_size, preserve_structure, page_numbers, summary_only, output_dir diff --git a/src/mcp_office_tools/server_legacy.py b/src/mcp_office_tools/server_legacy.py deleted file mode 100644 index 5f85e58..0000000 --- a/src/mcp_office_tools/server_legacy.py +++ /dev/null @@ -1,2209 +0,0 @@ -"""MCP Office Tools Server - Comprehensive Microsoft Office document processing. - -FastMCP server providing 30+ tools for processing Word, Excel, PowerPoint documents -including both modern formats (.docx, .xlsx, .pptx) and legacy formats (.doc, .xls, .ppt). -""" - -import os -import tempfile -import time -from pathlib import Path -from typing import Any - -from fastmcp import FastMCP -from pydantic import Field - -from .utils import ( - OfficeFileError, - classify_document_type, - detect_format, - get_supported_extensions, - resolve_office_file_path, - validate_office_file, -) - -# Initialize FastMCP app -app = FastMCP("MCP Office Tools") - -# Configuration -TEMP_DIR = os.environ.get("OFFICE_TEMP_DIR", tempfile.gettempdir()) -DEBUG = os.environ.get("DEBUG", "false").lower() == "true" - - -@app.tool() -async def extract_text( - file_path: str = Field(description="Path to Office document or URL"), - preserve_formatting: bool = Field(default=False, description="Preserve text formatting and structure"), - include_metadata: bool = Field(default=True, description="Include document metadata in output"), - method: str = Field(default="auto", description="Extraction method: auto, primary, fallback") -) -> dict[str, Any]: - """Extract text content from Office documents with intelligent method selection. - - Supports Word (.docx, .doc), Excel (.xlsx, .xls), PowerPoint (.pptx, .ppt), - and CSV files. Uses multi-library fallback for maximum compatibility. - """ - start_time = time.time() - - try: - # Resolve file path (download if URL) - local_path = await resolve_office_file_path(file_path) - - # Validate file - validation = await validate_office_file(local_path) - if not validation["is_valid"]: - raise OfficeFileError(f"Invalid file: {', '.join(validation['errors'])}") - - # Get format info - format_info = await detect_format(local_path) - category = format_info["category"] - extension = format_info["extension"] - - # Route to appropriate extraction method - if category == "word": - text_result = await _extract_word_text(local_path, extension, preserve_formatting, method) - elif category == "excel": - text_result = await _extract_excel_text(local_path, extension, preserve_formatting, method) - elif category == "powerpoint": - text_result = await _extract_powerpoint_text(local_path, extension, preserve_formatting, method) - else: - raise OfficeFileError(f"Unsupported document category: {category}") - - # Compile results - result = { - "text": text_result["text"], - "method_used": text_result["method_used"], - "character_count": len(text_result["text"]), - "word_count": len(text_result["text"].split()) if text_result["text"] else 0, - "extraction_time": round(time.time() - start_time, 3), - "format_info": { - "format": format_info["format_name"], - "category": category, - "is_legacy": format_info["is_legacy"] - } - } - - if include_metadata: - result["metadata"] = await _extract_basic_metadata(local_path, extension, category) - - if preserve_formatting: - result["formatted_sections"] = text_result.get("formatted_sections", []) - - return result - - except Exception as e: - if DEBUG: - import traceback - traceback.print_exc() - raise OfficeFileError(f"Text extraction failed: {str(e)}") - - -@app.tool() -async def extract_images( - file_path: str = Field(description="Path to Office document or URL"), - output_format: str = Field(default="png", description="Output image format: png, jpg, jpeg"), - min_width: int = Field(default=100, description="Minimum image width in pixels"), - min_height: int = Field(default=100, description="Minimum image height in pixels"), - include_metadata: bool = Field(default=True, description="Include image metadata") -) -> dict[str, Any]: - """Extract images from Office documents with size filtering and format conversion.""" - start_time = time.time() - - try: - # Resolve file path - local_path = await resolve_office_file_path(file_path) - - # Validate file - validation = await validate_office_file(local_path) - if not validation["is_valid"]: - raise OfficeFileError(f"Invalid file: {', '.join(validation['errors'])}") - - # Get format info - format_info = await detect_format(local_path) - category = format_info["category"] - extension = format_info["extension"] - - # Extract images based on format - if category == "word": - images = await _extract_word_images(local_path, extension, output_format, min_width, min_height) - elif category == "excel": - images = await _extract_excel_images(local_path, extension, output_format, min_width, min_height) - elif category == "powerpoint": - images = await _extract_powerpoint_images(local_path, extension, output_format, min_width, min_height) - else: - raise OfficeFileError(f"Image extraction not supported for category: {category}") - - result = { - "images": images, - "image_count": len(images), - "extraction_time": round(time.time() - start_time, 3), - "format_info": { - "format": format_info["format_name"], - "category": category - } - } - - if include_metadata: - result["total_size_bytes"] = sum(img.get("size_bytes", 0) for img in images) - - return result - - except Exception as e: - if DEBUG: - import traceback - traceback.print_exc() - raise OfficeFileError(f"Image extraction failed: {str(e)}") - - -@app.tool() -async def extract_metadata( - file_path: str = Field(description="Path to Office document or URL") -) -> dict[str, Any]: - """Extract comprehensive metadata from Office documents.""" - start_time = time.time() - - try: - # Resolve file path - local_path = await resolve_office_file_path(file_path) - - # Validate file - validation = await validate_office_file(local_path) - if not validation["is_valid"]: - raise OfficeFileError(f"Invalid file: {', '.join(validation['errors'])}") - - # Get format info - format_info = await detect_format(local_path) - category = format_info["category"] - extension = format_info["extension"] - - # Extract metadata based on format - if category == "word": - metadata = await _extract_word_metadata(local_path, extension) - elif category == "excel": - metadata = await _extract_excel_metadata(local_path, extension) - elif category == "powerpoint": - metadata = await _extract_powerpoint_metadata(local_path, extension) - else: - metadata = {"category": category, "basic_info": "Limited metadata available"} - - # Add file system metadata - path = Path(local_path) - stat = path.stat() - - result = { - "document_metadata": metadata, - "file_metadata": { - "filename": path.name, - "file_size": stat.st_size, - "created": stat.st_ctime, - "modified": stat.st_mtime, - "extension": extension - }, - "format_info": format_info, - "extraction_time": round(time.time() - start_time, 3) - } - - return result - - except Exception as e: - if DEBUG: - import traceback - traceback.print_exc() - raise OfficeFileError(f"Metadata extraction failed: {str(e)}") - - -@app.tool() -async def detect_office_format( - file_path: str = Field(description="Path to Office document or URL") -) -> dict[str, Any]: - """Intelligent Office document format detection and analysis.""" - start_time = time.time() - - try: - # Resolve file path - local_path = await resolve_office_file_path(file_path) - - # Detect format - format_info = await detect_format(local_path) - - # Classify document - classification = await classify_document_type(local_path) - - result = { - "format_detection": format_info, - "document_classification": classification, - "supported": format_info["is_supported"], - "processing_recommendations": format_info.get("processing_hints", []), - "detection_time": round(time.time() - start_time, 3) - } - - return result - - except Exception as e: - if DEBUG: - import traceback - traceback.print_exc() - raise OfficeFileError(f"Format detection failed: {str(e)}") - - -@app.tool() -async def analyze_document_health( - file_path: str = Field(description="Path to Office document or URL") -) -> dict[str, Any]: - """Comprehensive document health and integrity analysis.""" - start_time = time.time() - - try: - # Resolve file path - local_path = await resolve_office_file_path(file_path) - - # Validate file thoroughly - validation = await validate_office_file(local_path) - - # Get format info - format_info = await detect_format(local_path) - - # Health assessment - health_score = _calculate_health_score(validation, format_info) - - result = { - "overall_health": "healthy" if validation["is_valid"] and health_score >= 8 else - "warning" if health_score >= 5 else "problematic", - "health_score": health_score, - "validation_results": validation, - "format_analysis": format_info, - "recommendations": _get_health_recommendations(validation, format_info), - "analysis_time": round(time.time() - start_time, 3) - } - - return result - - except Exception as e: - if DEBUG: - import traceback - traceback.print_exc() - raise OfficeFileError(f"Health analysis failed: {str(e)}") - - -@app.tool() -async def convert_to_markdown( - file_path: str = Field(description="Path to Office document or URL"), - include_images: bool = Field(default=True, description="Include images in markdown with base64 encoding or file references"), - image_mode: str = Field(default="base64", description="Image handling mode: 'base64', 'files', or 'references'"), - max_image_size: int = Field(default=1024*1024, description="Maximum image size in bytes for base64 encoding"), - preserve_structure: bool = Field(default=True, description="Preserve document structure (headings, lists, tables)"), - page_range: str = Field(default="", description="Page range to convert (e.g., '1-5', '3', '1,3,5-10'). RECOMMENDED for large documents. Empty = all pages"), - bookmark_name: str = Field(default="", description="Extract content for a specific bookmark/chapter (e.g., 'Chapter1_Start'). More reliable than page ranges."), - chapter_name: str = Field(default="", description="Extract content for a chapter by heading text (e.g., 'Chapter 1', 'Introduction'). Works when bookmarks aren't available."), - summary_only: bool = Field(default=False, description="Return only metadata and truncated summary. STRONGLY RECOMMENDED for large docs (>10 pages)"), - output_dir: str = Field(default="", description="Output directory for image files (if image_mode='files')") -) -> dict[str, Any]: - """Convert Office documents to Markdown format with intelligent processing recommendations. - - ⚠️ RECOMMENDED WORKFLOW FOR LARGE DOCUMENTS (>5 pages): - 1. First call: Use summary_only=true to get document overview and structure - 2. Then: Use page_range (e.g., "1-10", "15-25") to process specific sections - - This prevents response size errors and provides efficient processing. - Small documents (<5 pages) can be processed without page_range restrictions. - """ - start_time = time.time() - - try: - # Resolve file path - local_path = await resolve_office_file_path(file_path) - - # Validate file - validation = await validate_office_file(local_path) - if not validation["is_valid"]: - raise OfficeFileError(f"Invalid file: {', '.join(validation['errors'])}") - - # Get format info - format_info = await detect_format(local_path) - category = format_info["category"] - extension = format_info["extension"] - - # Currently focused on Word documents for markdown conversion - if category != "word": - raise OfficeFileError(f"Markdown conversion currently only supports Word documents, got: {category}") - - # Analyze document size and provide intelligent recommendations - doc_analysis = await _analyze_document_size(local_path, extension) - processing_recommendation = _get_processing_recommendation( - doc_analysis, page_range, summary_only - ) - - # Parse page range if provided - page_numbers = _parse_page_range(page_range) if page_range else None - - # Prioritize bookmark/chapter extraction over page ranges - if bookmark_name or chapter_name: - page_numbers = None # Ignore page ranges when bookmark or chapter is specified - - # Convert to markdown based on format - if extension == ".docx": - markdown_result = await _convert_docx_to_markdown( - local_path, include_images, image_mode, max_image_size, - preserve_structure, page_numbers, summary_only, output_dir, bookmark_name, chapter_name - ) - else: # .doc - # For legacy .doc files, use mammoth if available - markdown_result = await _convert_doc_to_markdown( - local_path, include_images, image_mode, max_image_size, - preserve_structure, page_numbers, summary_only, output_dir - ) - - # Build result based on mode - result = { - "metadata": { - "original_file": os.path.basename(local_path), - "format": format_info["format_name"], - "conversion_method": markdown_result["method_used"], - "conversion_time": round(time.time() - start_time, 3), - "summary_only": summary_only, - "document_analysis": doc_analysis, - "processing_recommendation": processing_recommendation - } - } - - # Add page range info if used - if page_range: - result["metadata"]["page_range"] = page_range - result["metadata"]["pages_processed"] = len(page_numbers) if page_numbers else 0 - - # Add content based on mode - if summary_only: - # VERY restrictive summary mode to prevent massive responses - result["metadata"]["character_count"] = len(markdown_result["content"]) - result["metadata"]["word_count"] = len(markdown_result["content"].split()) - - # Ultra-short summary (only 500 chars max) - result["summary"] = markdown_result["content"][:500] + "..." if len(markdown_result["content"]) > 500 else markdown_result["content"] - - # Severely limit table of contents to prevent 1M+ token responses - if "table_of_contents" in markdown_result: - toc = markdown_result["table_of_contents"] - if "sections" in toc and len(toc["sections"]) > 20: - # Limit to first 20 sections only - limited_toc = { - "sections": toc["sections"][:20], - "total_sections": len(toc["sections"]), - "showing_first": 20, - "note": f"Showing first 20 of {len(toc['sections'])} sections. Use page_range to extract specific sections.", - "suggested_chunking": toc.get("suggested_chunking", [])[:10] # Limit chunking suggestions too - } - result["table_of_contents"] = limited_toc - else: - result["table_of_contents"] = toc - else: - # Include content with automatic size limiting to prevent MCP errors - content = markdown_result["content"] - - # Apply aggressive content limiting to stay under 25k token limit - # Rough estimate: ~4 chars per token, leave buffer for metadata - max_content_chars = 80000 # ~20k tokens worth of content - - if len(content) > max_content_chars: - # Truncate but try to preserve structure - truncated_content = _smart_truncate_content(content, max_content_chars) - result["markdown"] = truncated_content - result["content_truncated"] = True - result["original_length"] = len(content) - result["truncated_length"] = len(truncated_content) - result["truncation_note"] = f"Content truncated to stay under MCP 25k token limit. Original: {len(content):,} chars, Shown: {len(truncated_content):,} chars. Use smaller page ranges for full content." - else: - result["markdown"] = content - result["content_truncated"] = False - - result["metadata"]["character_count"] = len(content) - result["metadata"]["word_count"] = len(content.split()) - - # Add image info - if include_images and markdown_result.get("images"): - result["images"] = markdown_result["images"] - result["metadata"]["image_count"] = len(markdown_result["images"]) - result["metadata"]["total_image_size"] = sum( - img.get("size_bytes", 0) for img in markdown_result["images"] - ) - - # Add structure info - if preserve_structure and markdown_result.get("structure"): - result["structure"] = markdown_result["structure"] - - return result - - except Exception as e: - if DEBUG: - import traceback - traceback.print_exc() - raise OfficeFileError(f"Markdown conversion failed: {str(e)}") - - -@app.tool() -async def get_supported_formats() -> dict[str, Any]: - """Get list of all supported Office document formats and their capabilities.""" - extensions = get_supported_extensions() - - format_details = {} - for ext in extensions: - from .utils.validation import get_format_info - info = get_format_info(ext) - if info: - format_details[ext] = { - "format_name": info["format_name"], - "category": info["category"], - "mime_types": info["mime_types"] - } - - return { - "supported_extensions": extensions, - "format_details": format_details, - "categories": { - "word": [ext for ext, info in format_details.items() if info["category"] == "word"], - "excel": [ext for ext, info in format_details.items() if info["category"] == "excel"], - "powerpoint": [ext for ext, info in format_details.items() if info["category"] == "powerpoint"] - }, - "total_formats": len(extensions) - } - - -# Helper functions for text extraction -async def _extract_word_text(file_path: str, extension: str, preserve_formatting: bool, method: str) -> dict[str, Any]: - """Extract text from Word documents with fallback methods.""" - methods_tried = [] - - # Method selection - if method == "auto": - if extension == ".docx": - method_order = ["python-docx", "mammoth", "docx2txt"] - else: # .doc - method_order = ["olefile", "mammoth", "docx2txt"] - elif method == "primary": - method_order = ["python-docx"] if extension == ".docx" else ["olefile"] - else: # fallback - method_order = ["mammoth", "docx2txt"] - - text = "" - formatted_sections = [] - method_used = None - - for method_name in method_order: - try: - methods_tried.append(method_name) - - if method_name == "python-docx" and extension == ".docx": - import docx - doc = docx.Document(file_path) - - paragraphs = [] - for para in doc.paragraphs: - paragraphs.append(para.text) - if preserve_formatting: - formatted_sections.append({ - "type": "paragraph", - "text": para.text, - "style": para.style.name if para.style else None - }) - - text = "\n".join(paragraphs) - method_used = "python-docx" - break - - elif method_name == "mammoth": - import mammoth - - with open(file_path, "rb") as docx_file: - if preserve_formatting: - result = mammoth.convert_to_html(docx_file) - text = result.value - formatted_sections.append({ - "type": "html", - "content": result.value - }) - else: - result = mammoth.extract_raw_text(docx_file) - text = result.value - - method_used = "mammoth" - break - - elif method_name == "docx2txt": - import docx2txt - text = docx2txt.process(file_path) - method_used = "docx2txt" - break - - elif method_name == "olefile" and extension == ".doc": - # Basic text extraction for legacy .doc files - try: - import olefile - if olefile.isOleFile(file_path): - # This is a simplified approach - real .doc parsing is complex - with open(file_path, 'rb') as f: - content = f.read() - # Very basic text extraction attempt - text = content.decode('utf-8', errors='ignore') - # Clean up binary artifacts - import re - text = re.sub(r'[^\x20-\x7E\n\r\t]', '', text) - text = '\n'.join(line.strip() for line in text.split('\n') if line.strip()) - method_used = "olefile" - break - except Exception: - continue - - except ImportError: - continue - except Exception: - continue - - if not method_used: - raise OfficeFileError(f"Failed to extract text using methods: {', '.join(methods_tried)}") - - return { - "text": text, - "method_used": method_used, - "methods_tried": methods_tried, - "formatted_sections": formatted_sections - } - - -async def _extract_excel_text(file_path: str, extension: str, preserve_formatting: bool, method: str) -> dict[str, Any]: - """Extract text from Excel documents.""" - methods_tried = [] - - if extension == ".csv": - # CSV handling - import pandas as pd - try: - df = pd.read_csv(file_path) - text = df.to_string() - return { - "text": text, - "method_used": "pandas", - "methods_tried": ["pandas"], - "formatted_sections": [{"type": "table", "data": df.to_dict()}] if preserve_formatting else [] - } - except Exception as e: - raise OfficeFileError(f"CSV processing failed: {str(e)}") - - # Excel file handling - text = "" - formatted_sections = [] - method_used = None - - method_order = ["openpyxl", "pandas", "xlrd"] if extension == ".xlsx" else ["xlrd", "pandas", "openpyxl"] - - for method_name in method_order: - try: - methods_tried.append(method_name) - - if method_name == "openpyxl" and extension in [".xlsx", ".xlsm"]: - import openpyxl - wb = openpyxl.load_workbook(file_path, data_only=True) - - text_parts = [] - for sheet_name in wb.sheetnames: - ws = wb[sheet_name] - text_parts.append(f"Sheet: {sheet_name}") - - for row in ws.iter_rows(values_only=True): - row_text = "\t".join(str(cell) if cell is not None else "" for cell in row) - if row_text.strip(): - text_parts.append(row_text) - - if preserve_formatting: - formatted_sections.append({ - "type": "worksheet", - "name": sheet_name, - "data": [[str(cell.value) if cell.value is not None else "" for cell in row] for row in ws.iter_rows()] - }) - - text = "\n".join(text_parts) - method_used = "openpyxl" - break - - elif method_name == "pandas": - import pandas as pd - - if extension in [".xlsx", ".xlsm"]: - dfs = pd.read_excel(file_path, sheet_name=None) - else: # .xls - dfs = pd.read_excel(file_path, sheet_name=None, engine='xlrd') - - text_parts = [] - for sheet_name, df in dfs.items(): - text_parts.append(f"Sheet: {sheet_name}") - text_parts.append(df.to_string()) - - if preserve_formatting: - formatted_sections.append({ - "type": "dataframe", - "name": sheet_name, - "data": df.to_dict() - }) - - text = "\n\n".join(text_parts) - method_used = "pandas" - break - - elif method_name == "xlrd" and extension == ".xls": - import xlrd - wb = xlrd.open_workbook(file_path) - - text_parts = [] - for sheet in wb.sheets(): - text_parts.append(f"Sheet: {sheet.name}") - - for row_idx in range(sheet.nrows): - row = sheet.row_values(row_idx) - row_text = "\t".join(str(cell) for cell in row) - text_parts.append(row_text) - - text = "\n".join(text_parts) - method_used = "xlrd" - break - - except ImportError: - continue - except Exception: - continue - - if not method_used: - raise OfficeFileError(f"Failed to extract text using methods: {', '.join(methods_tried)}") - - return { - "text": text, - "method_used": method_used, - "methods_tried": methods_tried, - "formatted_sections": formatted_sections - } - - -async def _extract_powerpoint_text(file_path: str, extension: str, preserve_formatting: bool, method: str) -> dict[str, Any]: - """Extract text from PowerPoint documents.""" - methods_tried = [] - - if extension == ".pptx": - try: - import pptx - prs = pptx.Presentation(file_path) - - text_parts = [] - formatted_sections = [] - - for slide_num, slide in enumerate(prs.slides, 1): - slide_text_parts = [] - - for shape in slide.shapes: - if hasattr(shape, "text") and shape.text: - slide_text_parts.append(shape.text) - - slide_text = "\n".join(slide_text_parts) - text_parts.append(f"Slide {slide_num}:\n{slide_text}") - - if preserve_formatting: - formatted_sections.append({ - "type": "slide", - "number": slide_num, - "text": slide_text, - "shapes": len(slide.shapes) - }) - - text = "\n\n".join(text_parts) - - return { - "text": text, - "method_used": "python-pptx", - "methods_tried": ["python-pptx"], - "formatted_sections": formatted_sections - } - - except ImportError: - methods_tried.append("python-pptx") - except Exception: - methods_tried.append("python-pptx") - - # Legacy .ppt handling would require additional libraries - if extension == ".ppt": - raise OfficeFileError("Legacy PowerPoint (.ppt) text extraction requires additional setup") - - raise OfficeFileError(f"Failed to extract text using methods: {', '.join(methods_tried)}") - - -# Helper functions for image extraction -async def _extract_word_images(file_path: str, extension: str, output_format: str, min_width: int, min_height: int) -> list[dict[str, Any]]: - """Extract images from Word documents.""" - images = [] - - if extension == ".docx": - try: - import io - import zipfile - - from PIL import Image - - with zipfile.ZipFile(file_path, 'r') as zip_file: - # Look for images in media folder - image_files = [f for f in zip_file.namelist() if f.startswith('word/media/')] - - for i, img_path in enumerate(image_files): - try: - img_data = zip_file.read(img_path) - img = Image.open(io.BytesIO(img_data)) - - # Size filtering - if img.width >= min_width and img.height >= min_height: - # Save to temp file - temp_path = os.path.join(TEMP_DIR, f"word_image_{i}.{output_format}") - img.save(temp_path, format=output_format.upper()) - - images.append({ - "index": i, - "filename": os.path.basename(img_path), - "path": temp_path, - "width": img.width, - "height": img.height, - "format": img.format, - "size_bytes": len(img_data) - }) - except Exception: - continue - - except Exception as e: - raise OfficeFileError(f"Word image extraction failed: {str(e)}") - - return images - - -async def _extract_excel_images(file_path: str, extension: str, output_format: str, min_width: int, min_height: int) -> list[dict[str, Any]]: - """Extract images from Excel documents.""" - images = [] - - if extension in [".xlsx", ".xlsm"]: - try: - import io - import zipfile - - from PIL import Image - - with zipfile.ZipFile(file_path, 'r') as zip_file: - # Look for images in media folder - image_files = [f for f in zip_file.namelist() if f.startswith('xl/media/')] - - for i, img_path in enumerate(image_files): - try: - img_data = zip_file.read(img_path) - img = Image.open(io.BytesIO(img_data)) - - # Size filtering - if img.width >= min_width and img.height >= min_height: - # Save to temp file - temp_path = os.path.join(TEMP_DIR, f"excel_image_{i}.{output_format}") - img.save(temp_path, format=output_format.upper()) - - images.append({ - "index": i, - "filename": os.path.basename(img_path), - "path": temp_path, - "width": img.width, - "height": img.height, - "format": img.format, - "size_bytes": len(img_data) - }) - except Exception: - continue - - except Exception as e: - raise OfficeFileError(f"Excel image extraction failed: {str(e)}") - - return images - - -async def _extract_powerpoint_images(file_path: str, extension: str, output_format: str, min_width: int, min_height: int) -> list[dict[str, Any]]: - """Extract images from PowerPoint documents.""" - images = [] - - if extension == ".pptx": - try: - import io - import zipfile - - from PIL import Image - - with zipfile.ZipFile(file_path, 'r') as zip_file: - # Look for images in media folder - image_files = [f for f in zip_file.namelist() if f.startswith('ppt/media/')] - - for i, img_path in enumerate(image_files): - try: - img_data = zip_file.read(img_path) - img = Image.open(io.BytesIO(img_data)) - - # Size filtering - if img.width >= min_width and img.height >= min_height: - # Save to temp file - temp_path = os.path.join(TEMP_DIR, f"powerpoint_image_{i}.{output_format}") - img.save(temp_path, format=output_format.upper()) - - images.append({ - "index": i, - "filename": os.path.basename(img_path), - "path": temp_path, - "width": img.width, - "height": img.height, - "format": img.format, - "size_bytes": len(img_data) - }) - except Exception: - continue - - except Exception as e: - raise OfficeFileError(f"PowerPoint image extraction failed: {str(e)}") - - return images - - -# Helper functions for metadata extraction -async def _extract_basic_metadata(file_path: str, extension: str, category: str) -> dict[str, Any]: - """Extract basic metadata from Office documents.""" - metadata = {"category": category, "extension": extension} - - try: - if extension in [".docx", ".xlsx", ".pptx"] and category in ["word", "excel", "powerpoint"]: - import zipfile - - with zipfile.ZipFile(file_path, 'r') as zip_file: - # Core properties - if 'docProps/core.xml' in zip_file.namelist(): - zip_file.read('docProps/core.xml').decode('utf-8') - metadata["has_core_properties"] = True - - # App properties - if 'docProps/app.xml' in zip_file.namelist(): - zip_file.read('docProps/app.xml').decode('utf-8') - metadata["has_app_properties"] = True - - except Exception: - pass - - return metadata - - -async def _extract_word_metadata(file_path: str, extension: str) -> dict[str, Any]: - """Extract Word-specific metadata.""" - metadata = {"type": "word", "extension": extension} - - if extension == ".docx": - try: - import docx - doc = docx.Document(file_path) - - core_props = doc.core_properties - metadata.update({ - "title": core_props.title, - "author": core_props.author, - "subject": core_props.subject, - "keywords": core_props.keywords, - "comments": core_props.comments, - "created": str(core_props.created) if core_props.created else None, - "modified": str(core_props.modified) if core_props.modified else None - }) - - # Document structure - metadata.update({ - "paragraph_count": len(doc.paragraphs), - "section_count": len(doc.sections), - "has_tables": len(doc.tables) > 0, - "table_count": len(doc.tables) - }) - - except Exception: - pass - - return metadata - - -async def _extract_excel_metadata(file_path: str, extension: str) -> dict[str, Any]: - """Extract Excel-specific metadata.""" - metadata = {"type": "excel", "extension": extension} - - if extension in [".xlsx", ".xlsm"]: - try: - import openpyxl - wb = openpyxl.load_workbook(file_path) - - props = wb.properties - metadata.update({ - "title": props.title, - "creator": props.creator, - "subject": props.subject, - "description": props.description, - "keywords": props.keywords, - "created": str(props.created) if props.created else None, - "modified": str(props.modified) if props.modified else None - }) - - # Workbook structure - metadata.update({ - "worksheet_count": len(wb.worksheets), - "worksheet_names": wb.sheetnames, - "has_charts": any(len(ws._charts) > 0 for ws in wb.worksheets), - "has_images": any(len(ws._images) > 0 for ws in wb.worksheets) - }) - - except Exception: - pass - - return metadata - - -async def _extract_powerpoint_metadata(file_path: str, extension: str) -> dict[str, Any]: - """Extract PowerPoint-specific metadata.""" - metadata = {"type": "powerpoint", "extension": extension} - - if extension == ".pptx": - try: - import pptx - prs = pptx.Presentation(file_path) - - core_props = prs.core_properties - metadata.update({ - "title": core_props.title, - "author": core_props.author, - "subject": core_props.subject, - "keywords": core_props.keywords, - "comments": core_props.comments, - "created": str(core_props.created) if core_props.created else None, - "modified": str(core_props.modified) if core_props.modified else None - }) - - # Presentation structure - slide_layouts = set() - total_shapes = 0 - - for slide in prs.slides: - slide_layouts.add(slide.slide_layout.name) - total_shapes += len(slide.shapes) - - metadata.update({ - "slide_count": len(prs.slides), - "slide_layouts": list(slide_layouts), - "total_shapes": total_shapes, - "slide_width": prs.slide_width, - "slide_height": prs.slide_height - }) - - except Exception: - pass - - return metadata - - -def _calculate_health_score(validation: dict[str, Any], format_info: dict[str, Any]) -> int: - """Calculate document health score (1-10).""" - score = 10 - - # Deduct for validation errors - if not validation["is_valid"]: - score -= 5 - - if validation["errors"]: - score -= len(validation["errors"]) * 2 - - if validation["warnings"]: - score -= len(validation["warnings"]) - - # Deduct for problematic characteristics - if validation.get("password_protected"): - score -= 1 - - if format_info.get("is_legacy"): - score -= 1 - - structure = format_info.get("structure", {}) - if structure.get("estimated_complexity") == "complex": - score -= 1 - - return max(1, min(10, score)) - - -def _get_health_recommendations(validation: dict[str, Any], format_info: dict[str, Any]) -> list[str]: - """Get health improvement recommendations.""" - recommendations = [] - - if validation["errors"]: - recommendations.append("Fix validation errors before processing") - - if validation.get("password_protected"): - recommendations.append("Remove password protection if possible") - - if format_info.get("is_legacy"): - recommendations.append("Consider converting to modern format (.docx, .xlsx, .pptx)") - - structure = format_info.get("structure", {}) - if structure.get("estimated_complexity") == "complex": - recommendations.append("Complex document may require specialized processing") - - if not recommendations: - recommendations.append("Document appears healthy and ready for processing") - - return recommendations - - -# Markdown conversion helper functions -async def _convert_docx_to_markdown( - file_path: str, - include_images: bool, - image_mode: str, - max_image_size: int, - preserve_structure: bool, - page_numbers: list[int], - summary_only: bool, - output_dir: str, - bookmark_name: str = "", - chapter_name: str = "" -) -> dict[str, Any]: - """Convert .docx file to markdown with comprehensive feature support.""" - import base64 - - # ULTRA-FAST summary mode - skip all complex processing - if summary_only: - return await _get_ultra_fast_summary(file_path) - - # If page_numbers, bookmark_name, or chapter_name is specified, we need to use python-docx for targeted extraction - # as mammoth processes the entire document - if page_numbers or bookmark_name or chapter_name: - return await _convert_docx_with_python_docx( - file_path, include_images, image_mode, max_image_size, - preserve_structure, page_numbers, summary_only, output_dir, bookmark_name, chapter_name - ) - - try: - # Try mammoth first for better HTML->Markdown conversion (full document only) - import mammoth - - # Configure mammoth for markdown-friendly output - with open(file_path, "rb") as docx_file: - if include_images: - # Extract images and handle them based on mode - images_info = [] - - def convert_image(image): - image_data = image.open() - content_type = image.content_type - ext = content_type.split('/')[-1] if '/' in content_type else 'png' - - if image_mode == "base64": - if len(image_data) <= max_image_size: - encoded = base64.b64encode(image_data).decode('utf-8') - images_info.append({ - "filename": f"image_{len(images_info)}.{ext}", - "content_type": content_type, - "size_bytes": len(image_data), - "mode": "base64" - }) - return { - "src": f"data:{content_type};base64,{encoded}" - } - else: - # Too large for base64, fall back to reference - filename = f"large_image_{len(images_info)}.{ext}" - images_info.append({ - "filename": filename, - "content_type": content_type, - "size_bytes": len(image_data), - "mode": "reference", - "note": "Too large for base64 encoding" - }) - return {"src": filename} - - elif image_mode == "files": - # Save image to file - nonlocal output_dir - if not output_dir: - output_dir = os.path.join(TEMP_DIR, "markdown_images") - - os.makedirs(output_dir, exist_ok=True) - filename = f"image_{len(images_info)}.{ext}" - file_path = os.path.join(output_dir, filename) - - with open(file_path, 'wb') as img_file: - img_file.write(image_data) - - images_info.append({ - "filename": filename, - "file_path": file_path, - "content_type": content_type, - "size_bytes": len(image_data), - "mode": "file" - }) - return {"src": file_path} - - else: # references - filename = f"image_{len(images_info)}.{ext}" - images_info.append({ - "filename": filename, - "content_type": content_type, - "size_bytes": len(image_data), - "mode": "reference" - }) - return {"src": filename} - - # Convert with image handling - result = mammoth.convert_to_html( - docx_file, - convert_image=mammoth.images.img_element(convert_image) - ) - - html_content = result.value - markdown_content = _html_to_markdown(html_content, preserve_structure) - - conversion_result = { - "content": markdown_content, - "method_used": "mammoth-with-images", - "images": images_info - } - - else: - # Convert without images - result = mammoth.convert_to_markdown(docx_file) - markdown_content = result.value - - conversion_result = { - "content": markdown_content, - "method_used": "mammoth-markdown", - "images": [] - } - - # Handle summary mode - if summary_only and len(markdown_content) > 5000: - # For summary mode, truncate large content - markdown_content = markdown_content[:5000] + "\n\n[Content truncated - use summary_only=false for full content]" - - # Update the conversion result - conversion_result["content"] = markdown_content - - # Extract structure information - if preserve_structure: - structure = _extract_markdown_structure(markdown_content) - conversion_result["structure"] = structure - - return conversion_result - - except ImportError: - # Fall back to python-docx with custom markdown conversion - return await _convert_docx_with_python_docx( - file_path, include_images, image_mode, max_image_size, - preserve_structure, page_numbers, summary_only, output_dir, bookmark_name, chapter_name - ) - except Exception: - # Fall back to python-docx - return await _convert_docx_with_python_docx( - file_path, include_images, image_mode, max_image_size, - preserve_structure, page_numbers, summary_only, output_dir, bookmark_name, chapter_name - ) - - -async def _convert_docx_with_python_docx( - file_path: str, - include_images: bool, - image_mode: str, - max_image_size: int, - preserve_structure: bool, - page_numbers: list[int], - summary_only: bool, - output_dir: str, - bookmark_name: str = "", - chapter_name: str = "" -) -> dict[str, Any]: - """Convert .docx using python-docx with custom markdown conversion.""" - import base64 - - import docx - from docx.oxml.table import CT_Tbl - from docx.oxml.text.paragraph import CT_P - from docx.table import Table - from docx.text.paragraph import Paragraph - - doc = docx.Document(file_path) - markdown_parts = [] - images_info = [] - structure_info = {"headings": [], "tables": 0, "lists": 0, "paragraphs": 0} - - # Extract images if requested - if include_images: - extracted_images = await _extract_word_images(file_path, ".docx", "png", 1, 1) - for i, img in enumerate(extracted_images): - if image_mode == "base64": - if img.get("size_bytes", 0) <= max_image_size: - with open(img["path"], "rb") as img_file: - img_data = img_file.read() - encoded = base64.b64encode(img_data).decode('utf-8') - images_info.append({ - "filename": img["filename"], - "content_type": f"image/{img.get('format', 'png').lower()}", - "size_bytes": img.get("size_bytes", 0), - "mode": "base64", - "markdown_ref": f"![Image {i+1}](data:image/{img.get('format', 'png').lower()};base64,{encoded})" - }) - else: - images_info.append({ - "filename": img["filename"], - "size_bytes": img.get("size_bytes", 0), - "mode": "reference", - "markdown_ref": f"![Image {i+1}]({img['filename']})", - "note": "Too large for base64 encoding" - }) - elif image_mode == "files": - images_info.append({ - "filename": img["filename"], - "file_path": img["path"], - "size_bytes": img.get("size_bytes", 0), - "mode": "file", - "markdown_ref": f"![Image {i+1}]({img['path']})" - }) - else: # references - images_info.append({ - "filename": img["filename"], - "size_bytes": img.get("size_bytes", 0), - "mode": "reference", - "markdown_ref": f"![Image {i+1}]({img['filename']})" - }) - - # Handle bookmark-based, chapter-based, or page-based extraction vs full document - if bookmark_name: - # For bookmark extraction, find the bookmark boundaries - bookmark_range = await _find_bookmark_content_range(doc, bookmark_name) - if not bookmark_range: - return { - "content": f"Bookmark '{bookmark_name}' not found in document", - "method_used": "python-docx-bookmark-not-found", - "images": [], - "bookmark_error": True - } - max_paragraphs = 500 # Generous limit for bookmark sections - max_chars = 100000 - chapter_range = None - elif chapter_name: - # For chapter extraction, find the heading boundaries - chapter_range = await _find_chapter_content_range(doc, chapter_name) - if not chapter_range: - return { - "content": f"Chapter '{chapter_name}' not found in document. Available headings will be listed in processing_limits.", - "method_used": "python-docx-chapter-not-found", - "images": [], - "chapter_error": True, - "available_headings": await _get_available_headings(doc) - } - max_paragraphs = 500 # Generous limit for chapter sections - max_chars = 100000 - bookmark_range = None - elif page_numbers: - # For page ranges, severely limit content extraction - max_pages_requested = max(page_numbers) if page_numbers else 1 - # Rough estimate: ~20-30 paragraphs per page - max_paragraphs = min(max_pages_requested * 25, 100) # Cap at 100 paragraphs max - max_chars = min(max_pages_requested * 8000, 40000) # Cap at 40k chars max - bookmark_range = None - chapter_range = None - else: - max_paragraphs = 1000 # Large limit for full document - max_chars = 200000 - bookmark_range = None - chapter_range = None - - current_page = 1 - processed_paragraphs = 0 - total_chars = 0 - include_current_page = not page_numbers or current_page in page_numbers - table_of_contents = [] # Track headings with page numbers for TOC - - for element_idx, element in enumerate(doc.element.body): - # Early termination if we've processed enough content - if processed_paragraphs >= max_paragraphs or total_chars >= max_chars: - break - - # Skip elements outside bookmark/chapter range if targeted extraction is used - if bookmark_range and not (bookmark_range['start_idx'] <= element_idx <= bookmark_range['end_idx']): - continue - if chapter_range and not (chapter_range['start_idx'] <= element_idx <= chapter_range['end_idx']): - continue - - if isinstance(element, CT_P): - paragraph = Paragraph(element, doc) - - # Check for page breaks - if _has_page_break(paragraph): - current_page += 1 - include_current_page = not page_numbers or current_page in page_numbers - continue - - # Process content with strict limits - markdown_text = _paragraph_to_markdown(paragraph, preserve_structure) - if markdown_text.strip(): - # Check if adding this would exceed limits - text_length = len(markdown_text) - if total_chars + text_length > max_chars: - break # Stop processing - - markdown_parts.append(markdown_text) - processed_paragraphs += 1 - total_chars += text_length - structure_info["paragraphs"] += 1 - - # Track headings for both structure and TOC - if preserve_structure and markdown_text.startswith('#'): - level = len(markdown_text) - len(markdown_text.lstrip('#')) - heading_text = markdown_text.lstrip('# ').strip() - heading_info = { - "level": level, - "text": heading_text, - "position": len(markdown_parts) - 1, - "page": current_page - } - structure_info["headings"].append(heading_info) - - # Add to table of contents - table_of_contents.append({ - "level": level, - "title": heading_text, - "page": current_page, - "suggested_page_range": f"{current_page}-{current_page + _estimate_section_length(level)}" - }) - - elif isinstance(element, CT_Tbl): - # Process tables with strict limits - if processed_paragraphs < max_paragraphs and total_chars < max_chars: - table = Table(element, doc) - table_markdown = _table_to_markdown(table) - if table_markdown.strip(): - table_length = len(table_markdown) - if total_chars + table_length > max_chars: - break # Stop processing - - markdown_parts.append(table_markdown) - total_chars += table_length - structure_info["tables"] += 1 - - # Add image references at the end if any - if include_images and images_info: - markdown_parts.append("\n## Images\n") - for img in images_info: - markdown_parts.append(img["markdown_ref"]) - - markdown_content = "\n\n".join(markdown_parts) - - result = { - "content": markdown_content, - "method_used": "python-docx-custom", - "images": images_info - } - - # Add table of contents for navigation - if table_of_contents: - result["table_of_contents"] = _optimize_toc_page_ranges(table_of_contents) - - # Add processing limits info - result["processing_limits"] = { - "max_paragraphs_allowed": max_paragraphs, - "max_chars_allowed": max_chars, - "paragraphs_processed": processed_paragraphs, - "chars_processed": total_chars, - "content_truncated": processed_paragraphs >= max_paragraphs or total_chars >= max_chars, - "note": f"Processed {processed_paragraphs}/{max_paragraphs} paragraphs, {total_chars:,}/{max_chars:,} chars" - } - - # Add extraction method info - if bookmark_name and bookmark_range: - result["bookmark_extraction"] = { - "bookmark_name": bookmark_name, - "elements_range": f"{bookmark_range['start_idx']}-{bookmark_range['end_idx']}", - "extraction_note": bookmark_range["note"] - } - elif chapter_name and chapter_range: - result["chapter_extraction"] = { - "chapter_name": chapter_name, - "elements_range": f"{chapter_range['start_idx']}-{chapter_range['end_idx']}", - "extraction_note": chapter_range["note"] - } - elif page_numbers: - result["pages_processed"] = page_numbers - result["total_pages_in_range"] = len(page_numbers) - - # Handle summary mode - if summary_only and len(markdown_content) > 5000: - markdown_content = markdown_content[:5000] + "\n\n[Content truncated - use summary_only=false for full content]" - - # Update the result content - result["content"] = markdown_content - - # Add structure info - if preserve_structure: - result["structure"] = structure_info - - return result - - -async def _convert_doc_to_markdown( - file_path: str, - include_images: bool, - image_mode: str, - max_image_size: int, - preserve_structure: bool, - page_numbers: list[int], - summary_only: bool, - output_dir: str -) -> dict[str, Any]: - """Convert legacy .doc file to markdown using available methods.""" - try: - import mammoth - - with open(file_path, "rb") as doc_file: - result = mammoth.convert_to_markdown(doc_file) - markdown_content = result.value - - conversion_result = { - "content": markdown_content, - "method_used": "mammoth-doc", - "images": [] # Legacy .doc image extraction is complex - } - - # Handle summary mode - if summary_only and len(markdown_content) > 5000: - markdown_content = markdown_content[:5000] + "\n\n[Content truncated - use summary_only=false for full content]" - - # Update the conversion result - conversion_result["content"] = markdown_content - - if preserve_structure: - structure = _extract_markdown_structure(markdown_content) - conversion_result["structure"] = structure - - return conversion_result - - except ImportError: - raise OfficeFileError("Legacy .doc conversion requires mammoth library") - except Exception as e: - raise OfficeFileError(f"Legacy .doc conversion failed: {str(e)}") - - -def _paragraph_to_markdown(paragraph, preserve_structure: bool) -> str: - """Convert a Word paragraph to markdown format.""" - text = paragraph.text.strip() - if not text: - return "" - - if not preserve_structure: - return text - - # Handle different paragraph styles - style_name = paragraph.style.name.lower() if paragraph.style else "" - - if "heading" in style_name: - # Extract heading level from style name - import re - level_match = re.search(r'(\d+)', style_name) - level = int(level_match.group(1)) if level_match else 1 - return f"{'#' * level} {text}" - elif "title" in style_name: - return f"# {text}" - elif "subtitle" in style_name: - return f"## {text}" - elif style_name in ["list paragraph", "list"]: - return f"- {text}" - elif "quote" in style_name: - return f"> {text}" - else: - return text - - -def _table_to_markdown(table) -> str: - """Convert a Word table to markdown format.""" - markdown_rows = [] - - for i, row in enumerate(table.rows): - cells = [cell.text.strip().replace('\n', ' ') for cell in row.cells] - markdown_row = "| " + " | ".join(cells) + " |" - markdown_rows.append(markdown_row) - - # Add header separator after first row - if i == 0: - separator = "| " + " | ".join(["---"] * len(cells)) + " |" - markdown_rows.append(separator) - - return "\n".join(markdown_rows) - - -def _html_to_markdown(html_content: str, preserve_structure: bool) -> str: - """Convert HTML content to markdown format.""" - import re - - # Basic HTML to Markdown conversions - conversions = [ - (r']*>(.*?)', r'# \1'), - (r']*>(.*?)', r'## \1'), - (r']*>(.*?)', r'### \1'), - (r']*>(.*?)', r'#### \1'), - (r']*>(.*?)', r'##### \1'), - (r']*>(.*?)', r'###### \1'), - (r']*>(.*?)', r'**\1**'), - (r']*>(.*?)', r'**\1**'), - (r']*>(.*?)', r'*\1*'), - (r']*>(.*?)', r'*\1*'), - (r']*>(.*?)', r'`\1`'), - (r']*href="([^"]*)"[^>]*>(.*?)', r'[\2](\1)'), - (r']*src="([^"]*)"[^>]*/?>', r'![](\1)'), - (r']*>(.*?)

', r'\1\n'), - (r']*/?>', r'\n'), - (r']*>(.*?)', r'- \1'), - (r']*>(.*?)', r'\1'), - (r']*>(.*?)', r'\1'), - (r']*>(.*?)', r'> \1'), - ] - - markdown = html_content - for pattern, replacement in conversions: - markdown = re.sub(pattern, replacement, markdown, flags=re.DOTALL | re.IGNORECASE) - - # Clean up extra whitespace - markdown = re.sub(r'\n\s*\n\s*\n', '\n\n', markdown) - markdown = re.sub(r'^\s+|\s+$', '', markdown, flags=re.MULTILINE) - - return markdown - - -def _chunk_markdown(content: str, chunk_size: int) -> list[dict[str, Any]]: - """Split markdown content into chunks while preserving structure.""" - chunks = [] - lines = content.split('\n') - current_chunk = [] - current_size = 0 - chunk_num = 1 - - for line in lines: - line_size = len(line) + 1 # +1 for newline - - # If adding this line would exceed chunk size and we have content - if current_size + line_size > chunk_size and current_chunk: - chunks.append({ - "chunk_number": chunk_num, - "content": '\n'.join(current_chunk), - "character_count": current_size, - "line_count": len(current_chunk) - }) - current_chunk = [] - current_size = 0 - chunk_num += 1 - - current_chunk.append(line) - current_size += line_size - - # Add final chunk if there's remaining content - if current_chunk: - chunks.append({ - "chunk_number": chunk_num, - "content": '\n'.join(current_chunk), - "character_count": current_size, - "line_count": len(current_chunk) - }) - - return chunks - - -def _extract_markdown_structure(content: str) -> dict[str, Any]: - """Extract structure information from markdown content.""" - import re - - structure = { - "headings": [], - "lists": 0, - "links": 0, - "images": 0, - "code_blocks": 0, - "tables": 0, - "line_count": len(content.split('\n')) - } - - lines = content.split('\n') - for i, line in enumerate(lines): - # Find headings - heading_match = re.match(r'^(#{1,6})\s+(.+)', line) - if heading_match: - level = len(heading_match.group(1)) - text = heading_match.group(2).strip() - structure["headings"].append({ - "level": level, - "text": text, - "line_number": i + 1 - }) - - # Count other elements - if re.match(r'^[-*+]\s+', line): - structure["lists"] += 1 - - structure["links"] += len(re.findall(r'\[([^\]]+)\]\([^)]+\)', line)) - structure["images"] += len(re.findall(r'!\[([^\]]*)\]\([^)]+\)', line)) - - if line.strip().startswith('```'): - structure["code_blocks"] += 1 - - if '|' in line and line.count('|') >= 2: - structure["tables"] += 1 - - return structure - - -async def _find_bookmark_content_range(doc, bookmark_name: str) -> dict[str, Any]: - """Find the content range for a specific bookmark.""" - try: - # Find bookmark start and end positions in the document - bookmark_starts = {} - bookmark_ends = {} - - # Look for bookmark markers in the document XML - for elem_idx, element in enumerate(doc.element.body): - # Look for bookmark start markers - for bookmark_start in element.xpath('.//w:bookmarkStart', namespaces={'w': 'http://schemas.openxmlformats.org/wordprocessingml/2006/main'}): - name = bookmark_start.get('{http://schemas.openxmlformats.org/wordprocessingml/2006/main}name') - if name == bookmark_name: - bookmark_id = bookmark_start.get('{http://schemas.openxmlformats.org/wordprocessingml/2006/main}id') - bookmark_starts[bookmark_id] = elem_idx - - # Look for bookmark end markers - for bookmark_end in element.xpath('.//w:bookmarkEnd', namespaces={'w': 'http://schemas.openxmlformats.org/wordprocessingml/2006/main'}): - bookmark_id = bookmark_end.get('{http://schemas.openxmlformats.org/wordprocessingml/2006/main}id') - if bookmark_id in bookmark_starts: - bookmark_ends[bookmark_id] = elem_idx - break - - # Find the bookmark range - for bookmark_id, start_idx in bookmark_starts.items(): - if bookmark_id in bookmark_ends: - end_idx = bookmark_ends[bookmark_id] - # Extend range to capture full sections (look for next major heading) - extended_end = min(end_idx + 50, len(doc.element.body) - 1) # Extend by 50 elements or end of doc - return { - 'start_idx': start_idx, - 'end_idx': extended_end, - 'bookmark_id': bookmark_id, - 'note': f"Extracting content from bookmark '{bookmark_name}' (elements {start_idx}-{extended_end})" - } - - return None # Bookmark not found - - except Exception: - return None # Error finding bookmark - - -async def _find_chapter_content_range(doc, chapter_name: str) -> dict[str, Any]: - """Find the content range for a specific chapter by heading text.""" - try: - # Find heading that matches the chapter name - chapter_start_idx = None - chapter_end_idx = None - - # Search through document elements for matching heading - for elem_idx, element in enumerate(doc.element.body): - # Check if this element is a paragraph with heading style - try: - para = element - if para.tag.endswith('}p'): # Word paragraph element - # Get the text content - text_content = ''.join(text_elem.text or '' for text_elem in para.xpath('.//w:t', namespaces={'w': 'http://schemas.openxmlformats.org/wordprocessingml/2006/main'})) - - # Check if this matches our chapter name (case insensitive, flexible matching) - if text_content.strip() and chapter_name.lower() in text_content.lower().strip(): - # Check if it's actually a heading by looking at paragraph style - style_elem = para.xpath('.//w:pStyle', namespaces={'w': 'http://schemas.openxmlformats.org/wordprocessingml/2006/main'}) - if style_elem: - style_val = style_elem[0].get('{http://schemas.openxmlformats.org/wordprocessingml/2006/main}val', '') - if 'heading' in style_val.lower() or 'title' in style_val.lower(): - chapter_start_idx = elem_idx - break - # Also consider short text lines as potential headings - elif len(text_content.strip()) < 100: - chapter_start_idx = elem_idx - break - except Exception: - continue - - if chapter_start_idx is None: - return None # Chapter heading not found - - # Find the end of this chapter (next major heading or end of document) - chapter_end_idx = len(doc.element.body) - 1 # Default to end of document - - # Look for the next major heading to determine chapter end - for elem_idx in range(chapter_start_idx + 1, len(doc.element.body)): - try: - para = doc.element.body[elem_idx] - if para.tag.endswith('}p'): - # Check if this is a major heading (same level or higher than chapter start) - style_elem = para.xpath('.//w:pStyle', namespaces={'w': 'http://schemas.openxmlformats.org/wordprocessingml/2006/main'}) - if style_elem: - style_val = style_elem[0].get('{http://schemas.openxmlformats.org/wordprocessingml/2006/main}val', '') - if 'heading1' in style_val.lower() or 'title' in style_val.lower(): - chapter_end_idx = elem_idx - 1 - break - except Exception: - continue - - return { - 'start_idx': chapter_start_idx, - 'end_idx': chapter_end_idx, - 'chapter_name': chapter_name, - 'note': f"Extracting content for chapter '{chapter_name}' (elements {chapter_start_idx}-{chapter_end_idx})" - } - - except Exception: - return None # Error finding chapter - - -async def _get_available_headings(doc) -> list[str]: - """Extract available headings from the document to help users find chapter names.""" - try: - headings = [] - - # Search through document elements for headings - for element in doc.element.body[:100]: # Only check first 100 elements to avoid token issues - try: - if element.tag.endswith('}p'): # Word paragraph element - # Get the text content - text_content = ''.join(text_elem.text or '' for text_elem in element.xpath('.//w:t', namespaces={'w': 'http://schemas.openxmlformats.org/wordprocessingml/2006/main'})) - - if text_content.strip(): - # Check if it's a heading by looking at paragraph style - style_elem = element.xpath('.//w:pStyle', namespaces={'w': 'http://schemas.openxmlformats.org/wordprocessingml/2006/main'}) - if style_elem: - style_val = style_elem[0].get('{http://schemas.openxmlformats.org/wordprocessingml/2006/main}val', '') - if 'heading' in style_val.lower() or 'title' in style_val.lower(): - headings.append(text_content.strip()[:100]) # Limit heading length - # Also consider short text lines as potential headings - elif len(text_content.strip()) < 100: - # Only add if it looks like a heading (not just short random text) - if any(word in text_content.lower() for word in ['chapter', 'section', 'part', 'introduction', 'conclusion']): - headings.append(text_content.strip()) - except Exception: - continue - - return headings[:20] # Return max 20 headings to avoid token issues - - except Exception: - return [] - - -async def _get_ultra_fast_summary(file_path: str) -> dict[str, Any]: - """Ultra-fast summary that extracts minimal data to prevent MCP token limits.""" - try: - import docx - doc = docx.Document(file_path) - - # Extract only the first few paragraphs and major headings - content_parts = [] - heading_count = 0 - paragraph_count = 0 - max_content_length = 2000 # Very short limit - current_length = 0 - - # Get basic structure info quickly - total_paragraphs = len(doc.paragraphs) - total_tables = len(doc.tables) - - # Extract bookmarks (chapter markers) - bookmarks = [] - try: - # Access document's bookmarks through the XML - for bookmark in doc.element.xpath('//w:bookmarkStart', namespaces={'w': 'http://schemas.openxmlformats.org/wordprocessingml/2006/main'}): - bookmark_name = bookmark.get('{http://schemas.openxmlformats.org/wordprocessingml/2006/main}name') - if bookmark_name and not bookmark_name.startswith('_'): # Skip system bookmarks - bookmarks.append(bookmark_name) - except Exception: - pass # Bookmarks extraction failed, continue without - - # Extract just a few key headings and the start of content - for para in doc.paragraphs[:50]: # Only check first 50 paragraphs - text = para.text.strip() - if not text: - continue - - # Check if it's a heading (simple heuristic) - is_heading = (para.style and "heading" in para.style.name.lower()) or len(text) < 100 - - if is_heading and heading_count < 10: # Max 10 headings - content_parts.append(f"# {text}") - heading_count += 1 - current_length += len(text) + 3 - elif paragraph_count < 5 and current_length < max_content_length: # Max 5 paragraphs - content_parts.append(text) - paragraph_count += 1 - current_length += len(text) - - if current_length > max_content_length: - break - - # Create very basic summary - summary_content = "\n\n".join(content_parts) - - # Extract available headings for chapter navigation - available_headings = await _get_available_headings(doc) - - return { - "content": summary_content, - "method_used": "ultra-fast-summary", - "table_of_contents": { - "note": "Use full document processing for detailed TOC", - "basic_info": f"Document has ~{total_paragraphs} paragraphs, {total_tables} tables, {heading_count} headings found in first scan", - "bookmarks": bookmarks[:20] if bookmarks else [], # Limit to first 20 bookmarks - "bookmark_count": len(bookmarks), - "bookmark_note": "Bookmarks often indicate chapter starts. Use these as navigation hints for page_range extraction.", - "available_headings": available_headings[:10] if available_headings else [], # Limit to first 10 headings - "heading_count": len(available_headings), - "heading_note": "Use these headings with chapter_name parameter for chapter-based extraction when bookmarks are not available." - } - } - - except Exception as e: - return { - "content": f"Error creating summary: {str(e)}", - "method_used": "error-fallback", - "table_of_contents": {"note": "Summary generation failed"} - } - - -def _smart_truncate_content(content: str, max_chars: int) -> str: - """Intelligently truncate content while preserving structure and readability.""" - if len(content) <= max_chars: - return content - - lines = content.split('\n') - truncated_lines = [] - current_length = 0 - - # Try to preserve structure by stopping at a natural break point - for line in lines: - line_length = len(line) + 1 # +1 for newline - - # If adding this line would exceed limit - if current_length + line_length > max_chars: - # Try to find a good stopping point - if truncated_lines: - # Check if we're in the middle of a section - last_lines = '\n'.join(truncated_lines[-3:]) if len(truncated_lines) >= 3 else '\n'.join(truncated_lines) - - # If we stopped mid-paragraph, remove incomplete paragraph - if not (line.strip() == '' or line.startswith('#') or line.startswith('|')): - # Remove lines until we hit a natural break - while truncated_lines and not ( - truncated_lines[-1].strip() == '' or - truncated_lines[-1].startswith('#') or - truncated_lines[-1].startswith('|') or - truncated_lines[-1].startswith('-') or - truncated_lines[-1].startswith('*') - ): - truncated_lines.pop() - break - - truncated_lines.append(line) - current_length += line_length - - # Add truncation notice - result = '\n'.join(truncated_lines) - result += f"\n\n---\n**[CONTENT TRUNCATED]**\nShowing {len(result):,} of {len(content):,} characters.\nUse smaller page ranges (e.g., 3-5 pages) for full content without truncation.\n---" - - return result - - -def _estimate_section_length(heading_level: int) -> int: - """Estimate how many pages a section might span based on heading level.""" - # Higher level headings (H1) tend to have longer sections - if heading_level == 1: # Major chapters - return 8 - elif heading_level == 2: # Major sections - return 4 - elif heading_level == 3: # Subsections - return 2 - else: # Minor headings - return 1 - - -def _optimize_toc_page_ranges(toc_entries: list) -> dict[str, Any]: - """Optimize table of contents page ranges based on actual heading positions.""" - optimized_toc = { - "sections": [], - "total_sections": len(toc_entries), - "suggested_chunking": [] - } - - for i, entry in enumerate(toc_entries): - # Calculate actual end page based on next heading or document end - if i + 1 < len(toc_entries): - next_page = toc_entries[i + 1]["page"] - actual_end_page = max(entry["page"], next_page - 1) - else: - # Last section - use estimated length - actual_end_page = entry["page"] + _estimate_section_length(entry["level"]) - - optimized_entry = { - "level": entry["level"], - "title": entry["title"], - "start_page": entry["page"], - "estimated_end_page": actual_end_page, - "suggested_page_range": f"{entry['page']}-{actual_end_page}", - "section_type": _classify_section_type(entry["level"], entry["title"]) - } - optimized_toc["sections"].append(optimized_entry) - - # Generate chunking suggestions - optimized_toc["suggested_chunking"] = _generate_chunking_suggestions(optimized_toc["sections"]) - - return optimized_toc - - -def _classify_section_type(level: int, title: str) -> str: - """Classify section type based on level and title patterns.""" - title_lower = title.lower() - - if level == 1: - if any(word in title_lower for word in ["chapter", "part", "section"]): - return "chapter" - elif any(word in title_lower for word in ["introduction", "conclusion", "summary"]): - return "special_section" - else: - return "major_section" - elif level == 2: - return "section" - elif level == 3: - return "subsection" - else: - return "minor_heading" - - -def _generate_chunking_suggestions(sections: list) -> list[dict[str, Any]]: - """Generate smart chunking suggestions based on document structure.""" - suggestions = [] - current_chunk_pages = 0 - chunk_start = 1 - chunk_sections = [] - - for section in sections: - section_pages = section["estimated_end_page"] - section["start_page"] + 1 - - # If adding this section would make chunk too large, finalize current chunk - # Use smaller chunks (8 pages) to prevent MCP token limit issues - if current_chunk_pages + section_pages > 8 and chunk_sections: - suggestions.append({ - "chunk_number": len(suggestions) + 1, - "page_range": f"{chunk_start}-{chunk_sections[-1]['estimated_end_page']}", - "sections_included": [s["title"] for s in chunk_sections], - "estimated_pages": current_chunk_pages, - "description": f"Chunk {len(suggestions) + 1}: {chunk_sections[0]['title']}" + - (f" + {len(chunk_sections)-1} more sections" if len(chunk_sections) > 1 else "") - }) - - # Start new chunk - chunk_start = section["start_page"] - current_chunk_pages = section_pages - chunk_sections = [section] - else: - # Add to current chunk - current_chunk_pages += section_pages - chunk_sections.append(section) - - # Add final chunk if any sections remain - if chunk_sections: - suggestions.append({ - "chunk_number": len(suggestions) + 1, - "page_range": f"{chunk_start}-{chunk_sections[-1]['estimated_end_page']}", - "sections_included": [s["title"] for s in chunk_sections], - "estimated_pages": current_chunk_pages, - "description": f"Chunk {len(suggestions) + 1}: {chunk_sections[0]['title']}" + - (f" + {len(chunk_sections)-1} more sections" if len(chunk_sections) > 1 else "") - }) - - return suggestions - - -def _has_page_break(paragraph) -> bool: - """Check if a paragraph contains a page break.""" - try: - # Check for explicit page breaks in paragraph runs - for run in paragraph.runs: - if run._r.find('.//{http://schemas.openxmlformats.org/wordprocessingml/2006/main}br') is not None: - br_elem = run._r.find('.//{http://schemas.openxmlformats.org/wordprocessingml/2006/main}br') - if br_elem is not None and br_elem.get('{http://schemas.openxmlformats.org/wordprocessingml/2006/main}type') == 'page': - return True - return False - except Exception: - return False - - -def _parse_page_range(page_range: str) -> list[int]: - """Parse page range string into list of page numbers. - - Examples: - "1-5" -> [1, 2, 3, 4, 5] - "1,3,5" -> [1, 3, 5] - "1-3,5,7-9" -> [1, 2, 3, 5, 7, 8, 9] - """ - pages = set() - - for part in page_range.split(','): - part = part.strip() - if '-' in part: - # Handle range like "1-5" - start, end = part.split('-', 1) - try: - start_num = int(start.strip()) - end_num = int(end.strip()) - pages.update(range(start_num, end_num + 1)) - except ValueError: - continue - else: - # Handle single page like "3" - try: - pages.add(int(part)) - except ValueError: - continue - - return sorted(list(pages)) - - -async def _analyze_document_size(file_path: str, extension: str) -> dict[str, Any]: - """Analyze document to estimate size and complexity.""" - analysis = { - "estimated_pages": 1, - "file_size_mb": 0, - "complexity": "simple", - "estimated_content_size": "small" - } - - try: - # Get file size - from pathlib import Path - file_size = Path(file_path).stat().st_size - analysis["file_size_mb"] = round(file_size / (1024 * 1024), 2) - - if extension == ".docx": - try: - import docx - doc = docx.Document(file_path) - - # Estimate pages based on content - paragraph_count = len(doc.paragraphs) - table_count = len(doc.tables) - - # Rough estimation: ~40 paragraphs per page - estimated_pages = max(1, paragraph_count // 40) - analysis["estimated_pages"] = estimated_pages - - # Determine complexity - if table_count > 10 or paragraph_count > 500: - analysis["complexity"] = "complex" - elif table_count > 5 or paragraph_count > 200: - analysis["complexity"] = "moderate" - - # Estimate content size - if estimated_pages > 20: - analysis["estimated_content_size"] = "very_large" - elif estimated_pages > 10: - analysis["estimated_content_size"] = "large" - elif estimated_pages > 5: - analysis["estimated_content_size"] = "medium" - - except Exception: - # Fallback to file size estimation - if file_size > 5 * 1024 * 1024: # 5MB - analysis["estimated_pages"] = 50 - analysis["estimated_content_size"] = "very_large" - elif file_size > 1 * 1024 * 1024: # 1MB - analysis["estimated_pages"] = 20 - analysis["estimated_content_size"] = "large" - elif file_size > 500 * 1024: # 500KB - analysis["estimated_pages"] = 10 - analysis["estimated_content_size"] = "medium" - - except Exception: - pass - - return analysis - - -def _get_processing_recommendation( - doc_analysis: dict[str, Any], - page_range: str, - summary_only: bool -) -> dict[str, Any]: - """Generate intelligent processing recommendations based on document analysis.""" - - estimated_pages = doc_analysis["estimated_pages"] - content_size = doc_analysis["estimated_content_size"] - - recommendation = { - "status": "optimal", - "message": "", - "suggested_workflow": [], - "warnings": [] - } - - # Large document recommendations - if content_size in ["large", "very_large"] and not page_range and not summary_only: - recommendation["status"] = "suboptimal" - recommendation["message"] = ( - f"⚠️ Large document detected ({estimated_pages} estimated pages). " - "Consider using recommended workflow for better performance." - ) - recommendation["suggested_workflow"] = [ - "1. First: Call with summary_only=true to get document overview and TOC", - "2. Then: Use page_range to process specific sections (e.g., '1-5', '6-10', '15-20')", - "3. Recommended: Use 3-8 page chunks to stay under 25k token MCP limit", - "4. The tool auto-truncates if content is too large, but smaller ranges work better" - ] - recommendation["warnings"] = [ - "Page ranges >8 pages may hit 25k token response limit and get truncated", - "Use smaller page ranges (3-5 pages) for dense content documents", - "Auto-truncation preserves structure but loses content completeness" - ] - - # Medium document recommendations - elif content_size == "medium" and not page_range and not summary_only: - recommendation["status"] = "caution" - recommendation["message"] = ( - f"Medium document detected ({estimated_pages} estimated pages). " - "Consider summary_only=true first if you encounter response size issues." - ) - recommendation["suggested_workflow"] = [ - "Option 1: Try full processing (current approach)", - "Option 2: Use summary_only=true first, then page_range if needed" - ] - - # Optimal usage patterns - elif summary_only: - recommendation["message"] = "✅ Excellent! Using summary mode for initial document analysis." - recommendation["suggested_workflow"] = [ - "After reviewing summary, use page_range to extract specific sections of interest" - ] - - elif page_range and content_size in ["large", "very_large"]: - recommendation["message"] = "✅ Perfect! Using page-range processing for efficient extraction." - - elif content_size == "small": - recommendation["message"] = "✅ Small document - full processing is optimal." - - return recommendation - - -def main(): - """Main entry point for the MCP server.""" - import sys - - if len(sys.argv) > 1 and sys.argv[1] == "--version": - from . import __version__ - print(f"MCP Office Tools v{__version__}") - return - - # Run the FastMCP server - app.run() - - -if __name__ == "__main__": - main() diff --git a/src/mcp_office_tools/utils/__init__.py b/src/mcp_office_tools/utils/__init__.py index cb228e7..d652e27 100644 --- a/src/mcp_office_tools/utils/__init__.py +++ b/src/mcp_office_tools/utils/__init__.py @@ -27,6 +27,48 @@ from .decorators import ( handle_office_errors ) +from .processing import ( + TEMP_DIR, + DEBUG, + _extract_basic_metadata, + _calculate_health_score, + _get_health_recommendations, + _smart_truncate_content, + _parse_page_range, + _get_processing_recommendation, +) + +from .word_processing import ( + _extract_word_text, + _extract_word_images, + _extract_word_metadata, + _convert_docx_to_markdown, + _convert_docx_with_python_docx, + _convert_doc_to_markdown, + _get_ultra_fast_summary, + _find_bookmark_content_range, + _find_chapter_content_range, + _get_available_headings, + _has_page_break, + _analyze_document_size, + _paragraph_to_markdown, + _table_to_markdown, + _html_to_markdown, + _extract_markdown_structure, +) + +from .excel_processing import ( + _extract_excel_text, + _extract_excel_images, + _extract_excel_metadata, +) + +from .powerpoint_processing import ( + _extract_powerpoint_text, + _extract_powerpoint_images, + _extract_powerpoint_metadata, +) + __all__ = [ # Validation "OfficeFileError", diff --git a/src/mcp_office_tools/utils/excel_processing.py b/src/mcp_office_tools/utils/excel_processing.py new file mode 100644 index 0000000..a7a555e --- /dev/null +++ b/src/mcp_office_tools/utils/excel_processing.py @@ -0,0 +1,203 @@ +"""Excel document processing utilities. + +This module provides helper functions for extracting text, images, and metadata +from Excel documents (.xlsx, .xls, .xlsm, .csv) with intelligent method selection +and fallback support. +""" + +from typing import Any + +from . import OfficeFileError + + +async def _extract_excel_text(file_path: str, extension: str, preserve_formatting: bool, method: str) -> dict[str, Any]: + """Extract text from Excel documents.""" + methods_tried = [] + + if extension == ".csv": + # CSV handling + import pandas as pd + try: + df = pd.read_csv(file_path) + text = df.to_string() + return { + "text": text, + "method_used": "pandas", + "methods_tried": ["pandas"], + "formatted_sections": [{"type": "table", "data": df.to_dict()}] if preserve_formatting else [] + } + except Exception as e: + raise OfficeFileError(f"CSV processing failed: {str(e)}") + + # Excel file handling + text = "" + formatted_sections = [] + method_used = None + + method_order = ["openpyxl", "pandas", "xlrd"] if extension == ".xlsx" else ["xlrd", "pandas", "openpyxl"] + + for method_name in method_order: + try: + methods_tried.append(method_name) + + if method_name == "openpyxl" and extension in [".xlsx", ".xlsm"]: + import openpyxl + wb = openpyxl.load_workbook(file_path, data_only=True) + + text_parts = [] + for sheet_name in wb.sheetnames: + ws = wb[sheet_name] + text_parts.append(f"Sheet: {sheet_name}") + + for row in ws.iter_rows(values_only=True): + row_text = "\t".join(str(cell) if cell is not None else "" for cell in row) + if row_text.strip(): + text_parts.append(row_text) + + if preserve_formatting: + formatted_sections.append({ + "type": "worksheet", + "name": sheet_name, + "data": [[str(cell.value) if cell.value is not None else "" for cell in row] for row in ws.iter_rows()] + }) + + text = "\n".join(text_parts) + method_used = "openpyxl" + break + + elif method_name == "pandas": + import pandas as pd + + if extension in [".xlsx", ".xlsm"]: + dfs = pd.read_excel(file_path, sheet_name=None) + else: # .xls + dfs = pd.read_excel(file_path, sheet_name=None, engine='xlrd') + + text_parts = [] + for sheet_name, df in dfs.items(): + text_parts.append(f"Sheet: {sheet_name}") + text_parts.append(df.to_string()) + + if preserve_formatting: + formatted_sections.append({ + "type": "dataframe", + "name": sheet_name, + "data": df.to_dict() + }) + + text = "\n\n".join(text_parts) + method_used = "pandas" + break + + elif method_name == "xlrd" and extension == ".xls": + import xlrd + wb = xlrd.open_workbook(file_path) + + text_parts = [] + for sheet in wb.sheets(): + text_parts.append(f"Sheet: {sheet.name}") + + for row_idx in range(sheet.nrows): + row = sheet.row_values(row_idx) + row_text = "\t".join(str(cell) for cell in row) + text_parts.append(row_text) + + text = "\n".join(text_parts) + method_used = "xlrd" + break + + except ImportError: + continue + except Exception: + continue + + if not method_used: + raise OfficeFileError(f"Failed to extract text using methods: {', '.join(methods_tried)}") + + return { + "text": text, + "method_used": method_used, + "methods_tried": methods_tried, + "formatted_sections": formatted_sections + } + + +async def _extract_excel_images(file_path: str, extension: str, output_format: str, min_width: int, min_height: int) -> list[dict[str, Any]]: + """Extract images from Excel documents.""" + import io + import os + import tempfile + import zipfile + + from PIL import Image + + images = [] + TEMP_DIR = os.environ.get("OFFICE_TEMP_DIR", tempfile.gettempdir()) + + if extension in [".xlsx", ".xlsm"]: + try: + with zipfile.ZipFile(file_path, 'r') as zip_file: + # Look for images in media folder + image_files = [f for f in zip_file.namelist() if f.startswith('xl/media/')] + + for i, img_path in enumerate(image_files): + try: + img_data = zip_file.read(img_path) + img = Image.open(io.BytesIO(img_data)) + + # Size filtering + if img.width >= min_width and img.height >= min_height: + # Save to temp file + temp_path = os.path.join(TEMP_DIR, f"excel_image_{i}.{output_format}") + img.save(temp_path, format=output_format.upper()) + + images.append({ + "index": i, + "filename": os.path.basename(img_path), + "path": temp_path, + "width": img.width, + "height": img.height, + "format": img.format, + "size_bytes": len(img_data) + }) + except Exception: + continue + + except Exception as e: + raise OfficeFileError(f"Excel image extraction failed: {str(e)}") + + return images + + +async def _extract_excel_metadata(file_path: str, extension: str) -> dict[str, Any]: + """Extract Excel-specific metadata.""" + metadata = {"type": "excel", "extension": extension} + + if extension in [".xlsx", ".xlsm"]: + try: + import openpyxl + wb = openpyxl.load_workbook(file_path) + + props = wb.properties + metadata.update({ + "title": props.title, + "creator": props.creator, + "subject": props.subject, + "description": props.description, + "keywords": props.keywords, + "created": str(props.created) if props.created else None, + "modified": str(props.modified) if props.modified else None + }) + + # Workbook structure + metadata.update({ + "worksheet_count": len(wb.worksheets), + "worksheet_names": wb.sheetnames, + "has_charts": any(len(ws._charts) > 0 for ws in wb.worksheets), + "has_images": any(len(ws._images) > 0 for ws in wb.worksheets) + }) + + except Exception: + pass + + return metadata diff --git a/src/mcp_office_tools/utils/powerpoint_processing.py b/src/mcp_office_tools/utils/powerpoint_processing.py new file mode 100644 index 0000000..3de91c5 --- /dev/null +++ b/src/mcp_office_tools/utils/powerpoint_processing.py @@ -0,0 +1,177 @@ +"""PowerPoint document processing utilities. + +This module provides helper functions for extracting text, images, and metadata +from PowerPoint documents (.pptx and .ppt files). +""" + +import io +import os +import zipfile +from typing import Any + +from PIL import Image + +from . import OfficeFileError + + +async def _extract_powerpoint_text( + file_path: str, extension: str, preserve_formatting: bool, method: str +) -> dict[str, Any]: + """Extract text from PowerPoint documents.""" + methods_tried = [] + + if extension == ".pptx": + try: + import pptx + + prs = pptx.Presentation(file_path) + + text_parts = [] + formatted_sections = [] + + for slide_num, slide in enumerate(prs.slides, 1): + slide_text_parts = [] + + for shape in slide.shapes: + if hasattr(shape, "text") and shape.text: + slide_text_parts.append(shape.text) + + slide_text = "\n".join(slide_text_parts) + text_parts.append(f"Slide {slide_num}:\n{slide_text}") + + if preserve_formatting: + formatted_sections.append( + { + "type": "slide", + "number": slide_num, + "text": slide_text, + "shapes": len(slide.shapes), + } + ) + + text = "\n\n".join(text_parts) + + return { + "text": text, + "method_used": "python-pptx", + "methods_tried": ["python-pptx"], + "formatted_sections": formatted_sections, + } + + except ImportError: + methods_tried.append("python-pptx") + except Exception: + methods_tried.append("python-pptx") + + # Legacy .ppt handling would require additional libraries + if extension == ".ppt": + raise OfficeFileError( + "Legacy PowerPoint (.ppt) text extraction requires additional setup" + ) + + raise OfficeFileError( + f"Failed to extract text using methods: {', '.join(methods_tried)}" + ) + + +async def _extract_powerpoint_images( + file_path: str, + extension: str, + output_format: str, + min_width: int, + min_height: int, + temp_dir: str, +) -> list[dict[str, Any]]: + """Extract images from PowerPoint documents.""" + images = [] + + if extension == ".pptx": + try: + with zipfile.ZipFile(file_path, "r") as zip_file: + # Look for images in media folder + image_files = [ + f for f in zip_file.namelist() if f.startswith("ppt/media/") + ] + + for i, img_path in enumerate(image_files): + try: + img_data = zip_file.read(img_path) + img = Image.open(io.BytesIO(img_data)) + + # Size filtering + if img.width >= min_width and img.height >= min_height: + # Save to temp file + temp_path = os.path.join( + temp_dir, f"powerpoint_image_{i}.{output_format}" + ) + img.save(temp_path, format=output_format.upper()) + + images.append( + { + "index": i, + "filename": os.path.basename(img_path), + "path": temp_path, + "width": img.width, + "height": img.height, + "format": img.format, + "size_bytes": len(img_data), + } + ) + except Exception: + continue + + except Exception as e: + raise OfficeFileError(f"PowerPoint image extraction failed: {str(e)}") + + return images + + +async def _extract_powerpoint_metadata( + file_path: str, extension: str +) -> dict[str, Any]: + """Extract PowerPoint-specific metadata.""" + metadata = {"type": "powerpoint", "extension": extension} + + if extension == ".pptx": + try: + import pptx + + prs = pptx.Presentation(file_path) + + core_props = prs.core_properties + metadata.update( + { + "title": core_props.title, + "author": core_props.author, + "subject": core_props.subject, + "keywords": core_props.keywords, + "comments": core_props.comments, + "created": str(core_props.created) if core_props.created else None, + "modified": str(core_props.modified) + if core_props.modified + else None, + } + ) + + # Presentation structure + slide_layouts = set() + total_shapes = 0 + + for slide in prs.slides: + slide_layouts.add(slide.slide_layout.name) + total_shapes += len(slide.shapes) + + metadata.update( + { + "slide_count": len(prs.slides), + "slide_layouts": list(slide_layouts), + "total_shapes": total_shapes, + "slide_width": prs.slide_width, + "slide_height": prs.slide_height, + } + ) + + except Exception: + pass + + return metadata diff --git a/src/mcp_office_tools/utils/processing.py b/src/mcp_office_tools/utils/processing.py new file mode 100644 index 0000000..5f5a4ae --- /dev/null +++ b/src/mcp_office_tools/utils/processing.py @@ -0,0 +1,228 @@ +"""Universal processing helper functions for Office documents. + +This module contains helper functions used across different document processing +operations including metadata extraction, health scoring, content truncation, +and page range parsing. +""" + +import os +import tempfile +from typing import Any + +# Configuration +TEMP_DIR = os.environ.get("OFFICE_TEMP_DIR", tempfile.gettempdir()) +DEBUG = os.environ.get("DEBUG", "false").lower() == "true" + + +async def _extract_basic_metadata(file_path: str, extension: str, category: str) -> dict[str, Any]: + """Extract basic metadata from Office documents.""" + metadata = {"category": category, "extension": extension} + + try: + if extension in [".docx", ".xlsx", ".pptx"] and category in ["word", "excel", "powerpoint"]: + import zipfile + + with zipfile.ZipFile(file_path, 'r') as zip_file: + # Core properties + if 'docProps/core.xml' in zip_file.namelist(): + zip_file.read('docProps/core.xml').decode('utf-8') + metadata["has_core_properties"] = True + + # App properties + if 'docProps/app.xml' in zip_file.namelist(): + zip_file.read('docProps/app.xml').decode('utf-8') + metadata["has_app_properties"] = True + + except Exception: + pass + + return metadata + + +def _calculate_health_score(validation: dict[str, Any], format_info: dict[str, Any]) -> int: + """Calculate document health score (1-10).""" + score = 10 + + # Deduct for validation errors + if not validation["is_valid"]: + score -= 5 + + if validation["errors"]: + score -= len(validation["errors"]) * 2 + + if validation["warnings"]: + score -= len(validation["warnings"]) + + # Deduct for problematic characteristics + if validation.get("password_protected"): + score -= 1 + + if format_info.get("is_legacy"): + score -= 1 + + structure = format_info.get("structure", {}) + if structure.get("estimated_complexity") == "complex": + score -= 1 + + return max(1, min(10, score)) + + +def _get_health_recommendations(validation: dict[str, Any], format_info: dict[str, Any]) -> list[str]: + """Get health improvement recommendations.""" + recommendations = [] + + if validation["errors"]: + recommendations.append("Fix validation errors before processing") + + if validation.get("password_protected"): + recommendations.append("Remove password protection if possible") + + if format_info.get("is_legacy"): + recommendations.append("Consider converting to modern format (.docx, .xlsx, .pptx)") + + structure = format_info.get("structure", {}) + if structure.get("estimated_complexity") == "complex": + recommendations.append("Complex document may require specialized processing") + + if not recommendations: + recommendations.append("Document appears healthy and ready for processing") + + return recommendations + + +def _smart_truncate_content(content: str, max_chars: int) -> str: + """Intelligently truncate content while preserving structure and readability.""" + if len(content) <= max_chars: + return content + + lines = content.split('\n') + truncated_lines = [] + current_length = 0 + + # Try to preserve structure by stopping at a natural break point + for line in lines: + line_length = len(line) + 1 # +1 for newline + + # If adding this line would exceed limit + if current_length + line_length > max_chars: + # Try to find a good stopping point + if truncated_lines: + # Check if we're in the middle of a section + last_lines = '\n'.join(truncated_lines[-3:]) if len(truncated_lines) >= 3 else '\n'.join(truncated_lines) + + # If we stopped mid-paragraph, remove incomplete paragraph + if not (line.strip() == '' or line.startswith('#') or line.startswith('|')): + # Remove lines until we hit a natural break + while truncated_lines and not ( + truncated_lines[-1].strip() == '' or + truncated_lines[-1].startswith('#') or + truncated_lines[-1].startswith('|') or + truncated_lines[-1].startswith('-') or + truncated_lines[-1].startswith('*') + ): + truncated_lines.pop() + break + + truncated_lines.append(line) + current_length += line_length + + # Add truncation notice + result = '\n'.join(truncated_lines) + result += f"\n\n---\n**[CONTENT TRUNCATED]**\nShowing {len(result):,} of {len(content):,} characters.\nUse smaller page ranges (e.g., 3-5 pages) for full content without truncation.\n---" + + return result + + +def _parse_page_range(page_range: str) -> list[int]: + """Parse page range string into list of page numbers. + + Examples: + "1-5" -> [1, 2, 3, 4, 5] + "1,3,5" -> [1, 3, 5] + "1-3,5,7-9" -> [1, 2, 3, 5, 7, 8, 9] + """ + pages = set() + + for part in page_range.split(','): + part = part.strip() + if '-' in part: + # Handle range like "1-5" + start, end = part.split('-', 1) + try: + start_num = int(start.strip()) + end_num = int(end.strip()) + pages.update(range(start_num, end_num + 1)) + except ValueError: + continue + else: + # Handle single page like "3" + try: + pages.add(int(part)) + except ValueError: + continue + + return sorted(list(pages)) + + +def _get_processing_recommendation( + doc_analysis: dict[str, Any], + page_range: str, + summary_only: bool +) -> dict[str, Any]: + """Generate intelligent processing recommendations based on document analysis.""" + + estimated_pages = doc_analysis["estimated_pages"] + content_size = doc_analysis["estimated_content_size"] + + recommendation = { + "status": "optimal", + "message": "", + "suggested_workflow": [], + "warnings": [] + } + + # Large document recommendations + if content_size in ["large", "very_large"] and not page_range and not summary_only: + recommendation["status"] = "suboptimal" + recommendation["message"] = ( + f"⚠️ Large document detected ({estimated_pages} estimated pages). " + "Consider using recommended workflow for better performance." + ) + recommendation["suggested_workflow"] = [ + "1. First: Call with summary_only=true to get document overview and TOC", + "2. Then: Use page_range to process specific sections (e.g., '1-5', '6-10', '15-20')", + "3. Recommended: Use 3-8 page chunks to stay under 25k token MCP limit", + "4. The tool auto-truncates if content is too large, but smaller ranges work better" + ] + recommendation["warnings"] = [ + "Page ranges >8 pages may hit 25k token response limit and get truncated", + "Use smaller page ranges (3-5 pages) for dense content documents", + "Auto-truncation preserves structure but loses content completeness" + ] + + # Medium document recommendations + elif content_size == "medium" and not page_range and not summary_only: + recommendation["status"] = "caution" + recommendation["message"] = ( + f"Medium document detected ({estimated_pages} estimated pages). " + "Consider summary_only=true first if you encounter response size issues." + ) + recommendation["suggested_workflow"] = [ + "Option 1: Try full processing (current approach)", + "Option 2: Use summary_only=true first, then page_range if needed" + ] + + # Optimal usage patterns + elif summary_only: + recommendation["message"] = "✅ Excellent! Using summary mode for initial document analysis." + recommendation["suggested_workflow"] = [ + "After reviewing summary, use page_range to extract specific sections of interest" + ] + + elif page_range and content_size in ["large", "very_large"]: + recommendation["message"] = "✅ Perfect! Using page-range processing for efficient extraction." + + elif content_size == "small": + recommendation["message"] = "✅ Small document - full processing is optimal." + + return recommendation diff --git a/src/mcp_office_tools/server_monolithic.py b/src/mcp_office_tools/utils/word_processing.py similarity index 53% rename from src/mcp_office_tools/server_monolithic.py rename to src/mcp_office_tools/utils/word_processing.py index fcfad65..9a68ca3 100644 --- a/src/mcp_office_tools/server_monolithic.py +++ b/src/mcp_office_tools/utils/word_processing.py @@ -1,474 +1,28 @@ -"""MCP Office Tools Server - Comprehensive Microsoft Office document processing. +"""Word document processing utilities. -FastMCP server providing 30+ tools for processing Word, Excel, PowerPoint documents -including both modern formats (.docx, .xlsx, .pptx) and legacy formats (.doc, .xls, .ppt). +Helper functions for extracting text, images, metadata, and converting Word documents +to markdown format with support for page ranges, bookmarks, and chapter-based extraction. """ +import base64 +import io import os +import re import tempfile -import time +import zipfile from pathlib import Path from typing import Any -from fastmcp import FastMCP -from pydantic import Field +from PIL import Image -from .utils import ( - OfficeFileError, - classify_document_type, - detect_format, - get_supported_extensions, - resolve_office_file_path, - validate_office_file, -) - -# Initialize FastMCP app -app = FastMCP("MCP Office Tools") - -# Configuration +# Temp directory configuration TEMP_DIR = os.environ.get("OFFICE_TEMP_DIR", tempfile.gettempdir()) -DEBUG = os.environ.get("DEBUG", "false").lower() == "true" -@app.tool() -async def extract_text( - file_path: str = Field(description="Path to Office document or URL"), - preserve_formatting: bool = Field(default=False, description="Preserve text formatting and structure"), - include_metadata: bool = Field(default=True, description="Include document metadata in output"), - method: str = Field(default="auto", description="Extraction method: auto, primary, fallback") -) -> dict[str, Any]: - """Extract text content from Office documents with intelligent method selection. - - Supports Word (.docx, .doc), Excel (.xlsx, .xls), PowerPoint (.pptx, .ppt), - and CSV files. Uses multi-library fallback for maximum compatibility. - """ - start_time = time.time() - - try: - # Resolve file path (download if URL) - local_path = await resolve_office_file_path(file_path) - - # Validate file - validation = await validate_office_file(local_path) - if not validation["is_valid"]: - raise OfficeFileError(f"Invalid file: {', '.join(validation['errors'])}") - - # Get format info - format_info = await detect_format(local_path) - category = format_info["category"] - extension = format_info["extension"] - - # Route to appropriate extraction method - if category == "word": - text_result = await _extract_word_text(local_path, extension, preserve_formatting, method) - elif category == "excel": - text_result = await _extract_excel_text(local_path, extension, preserve_formatting, method) - elif category == "powerpoint": - text_result = await _extract_powerpoint_text(local_path, extension, preserve_formatting, method) - else: - raise OfficeFileError(f"Unsupported document category: {category}") - - # Compile results - result = { - "text": text_result["text"], - "method_used": text_result["method_used"], - "character_count": len(text_result["text"]), - "word_count": len(text_result["text"].split()) if text_result["text"] else 0, - "extraction_time": round(time.time() - start_time, 3), - "format_info": { - "format": format_info["format_name"], - "category": category, - "is_legacy": format_info["is_legacy"] - } - } - - if include_metadata: - result["metadata"] = await _extract_basic_metadata(local_path, extension, category) - - if preserve_formatting: - result["formatted_sections"] = text_result.get("formatted_sections", []) - - return result - - except Exception as e: - if DEBUG: - import traceback - traceback.print_exc() - raise OfficeFileError(f"Text extraction failed: {str(e)}") - - -@app.tool() -async def extract_images( - file_path: str = Field(description="Path to Office document or URL"), - output_format: str = Field(default="png", description="Output image format: png, jpg, jpeg"), - min_width: int = Field(default=100, description="Minimum image width in pixels"), - min_height: int = Field(default=100, description="Minimum image height in pixels"), - include_metadata: bool = Field(default=True, description="Include image metadata") -) -> dict[str, Any]: - """Extract images from Office documents with size filtering and format conversion.""" - start_time = time.time() - - try: - # Resolve file path - local_path = await resolve_office_file_path(file_path) - - # Validate file - validation = await validate_office_file(local_path) - if not validation["is_valid"]: - raise OfficeFileError(f"Invalid file: {', '.join(validation['errors'])}") - - # Get format info - format_info = await detect_format(local_path) - category = format_info["category"] - extension = format_info["extension"] - - # Extract images based on format - if category == "word": - images = await _extract_word_images(local_path, extension, output_format, min_width, min_height) - elif category == "excel": - images = await _extract_excel_images(local_path, extension, output_format, min_width, min_height) - elif category == "powerpoint": - images = await _extract_powerpoint_images(local_path, extension, output_format, min_width, min_height) - else: - raise OfficeFileError(f"Image extraction not supported for category: {category}") - - result = { - "images": images, - "image_count": len(images), - "extraction_time": round(time.time() - start_time, 3), - "format_info": { - "format": format_info["format_name"], - "category": category - } - } - - if include_metadata: - result["total_size_bytes"] = sum(img.get("size_bytes", 0) for img in images) - - return result - - except Exception as e: - if DEBUG: - import traceback - traceback.print_exc() - raise OfficeFileError(f"Image extraction failed: {str(e)}") - - -@app.tool() -async def extract_metadata( - file_path: str = Field(description="Path to Office document or URL") -) -> dict[str, Any]: - """Extract comprehensive metadata from Office documents.""" - start_time = time.time() - - try: - # Resolve file path - local_path = await resolve_office_file_path(file_path) - - # Validate file - validation = await validate_office_file(local_path) - if not validation["is_valid"]: - raise OfficeFileError(f"Invalid file: {', '.join(validation['errors'])}") - - # Get format info - format_info = await detect_format(local_path) - category = format_info["category"] - extension = format_info["extension"] - - # Extract metadata based on format - if category == "word": - metadata = await _extract_word_metadata(local_path, extension) - elif category == "excel": - metadata = await _extract_excel_metadata(local_path, extension) - elif category == "powerpoint": - metadata = await _extract_powerpoint_metadata(local_path, extension) - else: - metadata = {"category": category, "basic_info": "Limited metadata available"} - - # Add file system metadata - path = Path(local_path) - stat = path.stat() - - result = { - "document_metadata": metadata, - "file_metadata": { - "filename": path.name, - "file_size": stat.st_size, - "created": stat.st_ctime, - "modified": stat.st_mtime, - "extension": extension - }, - "format_info": format_info, - "extraction_time": round(time.time() - start_time, 3) - } - - return result - - except Exception as e: - if DEBUG: - import traceback - traceback.print_exc() - raise OfficeFileError(f"Metadata extraction failed: {str(e)}") - - -@app.tool() -async def detect_office_format( - file_path: str = Field(description="Path to Office document or URL") -) -> dict[str, Any]: - """Intelligent Office document format detection and analysis.""" - start_time = time.time() - - try: - # Resolve file path - local_path = await resolve_office_file_path(file_path) - - # Detect format - format_info = await detect_format(local_path) - - # Classify document - classification = await classify_document_type(local_path) - - result = { - "format_detection": format_info, - "document_classification": classification, - "supported": format_info["is_supported"], - "processing_recommendations": format_info.get("processing_hints", []), - "detection_time": round(time.time() - start_time, 3) - } - - return result - - except Exception as e: - if DEBUG: - import traceback - traceback.print_exc() - raise OfficeFileError(f"Format detection failed: {str(e)}") - - -@app.tool() -async def analyze_document_health( - file_path: str = Field(description="Path to Office document or URL") -) -> dict[str, Any]: - """Comprehensive document health and integrity analysis.""" - start_time = time.time() - - try: - # Resolve file path - local_path = await resolve_office_file_path(file_path) - - # Validate file thoroughly - validation = await validate_office_file(local_path) - - # Get format info - format_info = await detect_format(local_path) - - # Health assessment - health_score = _calculate_health_score(validation, format_info) - - result = { - "overall_health": "healthy" if validation["is_valid"] and health_score >= 8 else - "warning" if health_score >= 5 else "problematic", - "health_score": health_score, - "validation_results": validation, - "format_analysis": format_info, - "recommendations": _get_health_recommendations(validation, format_info), - "analysis_time": round(time.time() - start_time, 3) - } - - return result - - except Exception as e: - if DEBUG: - import traceback - traceback.print_exc() - raise OfficeFileError(f"Health analysis failed: {str(e)}") - - -@app.tool() -async def convert_to_markdown( - file_path: str = Field(description="Path to Office document or URL"), - include_images: bool = Field(default=True, description="Include images in markdown output. When True, images are extracted to files and linked in the markdown."), - image_mode: str = Field(default="files", description="Image handling mode: 'files' (default, saves to disk and links), 'base64' (embeds inline - WARNING: can create massive responses), or 'references' (metadata only, no content)"), - max_image_size: int = Field(default=1024*1024, description="Maximum image size in bytes for base64 encoding (only used when image_mode='base64')"), - preserve_structure: bool = Field(default=True, description="Preserve document structure (headings, lists, tables)"), - page_range: str = Field(default="", description="Page range to convert (e.g., '1-5', '3', '1,3,5-10'). RECOMMENDED for large documents. Empty = all pages"), - bookmark_name: str = Field(default="", description="Extract content for a specific bookmark/chapter (e.g., 'Chapter1_Start'). More reliable than page ranges."), - chapter_name: str = Field(default="", description="Extract content for a chapter by heading text (e.g., 'Chapter 1', 'Introduction'). Works when bookmarks aren't available."), - summary_only: bool = Field(default=False, description="Return only metadata and truncated summary. STRONGLY RECOMMENDED for large docs (>10 pages)"), - output_dir: str = Field(default="", description="Output directory for extracted image files. If empty, uses a temp directory based on document name.") -) -> dict[str, Any]: - """Convert Office documents to Markdown format with intelligent processing recommendations. - - ⚠️ RECOMMENDED WORKFLOW FOR LARGE DOCUMENTS (>5 pages): - 1. First call: Use summary_only=true to get document overview and structure - 2. Then: Use page_range (e.g., "1-10", "15-25") to process specific sections - - This prevents response size errors and provides efficient processing. - Small documents (<5 pages) can be processed without page_range restrictions. - """ - start_time = time.time() - - try: - # Resolve file path - local_path = await resolve_office_file_path(file_path) - - # Validate file - validation = await validate_office_file(local_path) - if not validation["is_valid"]: - raise OfficeFileError(f"Invalid file: {', '.join(validation['errors'])}") - - # Get format info - format_info = await detect_format(local_path) - category = format_info["category"] - extension = format_info["extension"] - - # Currently focused on Word documents for markdown conversion - if category != "word": - raise OfficeFileError(f"Markdown conversion currently only supports Word documents, got: {category}") - - # Analyze document size and provide intelligent recommendations - doc_analysis = await _analyze_document_size(local_path, extension) - processing_recommendation = _get_processing_recommendation( - doc_analysis, page_range, summary_only - ) - - # Parse page range if provided - page_numbers = _parse_page_range(page_range) if page_range else None - - # Prioritize bookmark/chapter extraction over page ranges - if bookmark_name or chapter_name: - page_numbers = None # Ignore page ranges when bookmark or chapter is specified - - # Convert to markdown based on format - if extension == ".docx": - markdown_result = await _convert_docx_to_markdown( - local_path, include_images, image_mode, max_image_size, - preserve_structure, page_numbers, summary_only, output_dir, bookmark_name, chapter_name - ) - else: # .doc - # For legacy .doc files, use mammoth if available - markdown_result = await _convert_doc_to_markdown( - local_path, include_images, image_mode, max_image_size, - preserve_structure, page_numbers, summary_only, output_dir - ) - - # Build result based on mode - result = { - "metadata": { - "original_file": os.path.basename(local_path), - "format": format_info["format_name"], - "conversion_method": markdown_result["method_used"], - "conversion_time": round(time.time() - start_time, 3), - "summary_only": summary_only, - "document_analysis": doc_analysis, - "processing_recommendation": processing_recommendation - } - } - - # Add page range info if used - if page_range: - result["metadata"]["page_range"] = page_range - result["metadata"]["pages_processed"] = len(page_numbers) if page_numbers else 0 - - # Add content based on mode - if summary_only: - # VERY restrictive summary mode to prevent massive responses - result["metadata"]["character_count"] = len(markdown_result["content"]) - result["metadata"]["word_count"] = len(markdown_result["content"].split()) - - # Ultra-short summary (only 500 chars max) - result["summary"] = markdown_result["content"][:500] + "..." if len(markdown_result["content"]) > 500 else markdown_result["content"] - - # Severely limit table of contents to prevent 1M+ token responses - if "table_of_contents" in markdown_result: - toc = markdown_result["table_of_contents"] - if "sections" in toc and len(toc["sections"]) > 20: - # Limit to first 20 sections only - limited_toc = { - "sections": toc["sections"][:20], - "total_sections": len(toc["sections"]), - "showing_first": 20, - "note": f"Showing first 20 of {len(toc['sections'])} sections. Use page_range to extract specific sections.", - "suggested_chunking": toc.get("suggested_chunking", [])[:10] # Limit chunking suggestions too - } - result["table_of_contents"] = limited_toc - else: - result["table_of_contents"] = toc - else: - # Include content with automatic size limiting to prevent MCP errors - content = markdown_result["content"] - - # Apply aggressive content limiting to stay under 25k token limit - # Rough estimate: ~4 chars per token, leave buffer for metadata - max_content_chars = 80000 # ~20k tokens worth of content - - if len(content) > max_content_chars: - # Truncate but try to preserve structure - truncated_content = _smart_truncate_content(content, max_content_chars) - result["markdown"] = truncated_content - result["content_truncated"] = True - result["original_length"] = len(content) - result["truncated_length"] = len(truncated_content) - result["truncation_note"] = f"Content truncated to stay under MCP 25k token limit. Original: {len(content):,} chars, Shown: {len(truncated_content):,} chars. Use smaller page ranges for full content." - else: - result["markdown"] = content - result["content_truncated"] = False - - result["metadata"]["character_count"] = len(content) - result["metadata"]["word_count"] = len(content.split()) - - # Add image info - if include_images and markdown_result.get("images"): - result["images"] = markdown_result["images"] - result["metadata"]["image_count"] = len(markdown_result["images"]) - result["metadata"]["total_image_size"] = sum( - img.get("size_bytes", 0) for img in markdown_result["images"] - ) - - # Add structure info - if preserve_structure and markdown_result.get("structure"): - result["structure"] = markdown_result["structure"] - - return result - - except Exception as e: - if DEBUG: - import traceback - traceback.print_exc() - raise OfficeFileError(f"Markdown conversion failed: {str(e)}") - - -@app.tool() -async def get_supported_formats() -> dict[str, Any]: - """Get list of all supported Office document formats and their capabilities.""" - extensions = get_supported_extensions() - - format_details = {} - for ext in extensions: - from .utils.validation import get_format_info - info = get_format_info(ext) - if info: - format_details[ext] = { - "format_name": info["format_name"], - "category": info["category"], - "mime_types": info["mime_types"] - } - - return { - "supported_extensions": extensions, - "format_details": format_details, - "categories": { - "word": [ext for ext, info in format_details.items() if info["category"] == "word"], - "excel": [ext for ext, info in format_details.items() if info["category"] == "excel"], - "powerpoint": [ext for ext, info in format_details.items() if info["category"] == "powerpoint"] - }, - "total_formats": len(extensions) - } - - -# Helper functions for text extraction async def _extract_word_text(file_path: str, extension: str, preserve_formatting: bool, method: str) -> dict[str, Any]: """Extract text from Word documents with fallback methods.""" + from ..utils import OfficeFileError + methods_tried = [] # Method selection @@ -543,7 +97,6 @@ async def _extract_word_text(file_path: str, extension: str, preserve_formatting # Very basic text extraction attempt text = content.decode('utf-8', errors='ignore') # Clean up binary artifacts - import re text = re.sub(r'[^\x20-\x7E\n\r\t]', '', text) text = '\n'.join(line.strip() for line in text.split('\n') if line.strip()) method_used = "olefile" @@ -567,181 +120,14 @@ async def _extract_word_text(file_path: str, extension: str, preserve_formatting } -async def _extract_excel_text(file_path: str, extension: str, preserve_formatting: bool, method: str) -> dict[str, Any]: - """Extract text from Excel documents.""" - methods_tried = [] - - if extension == ".csv": - # CSV handling - import pandas as pd - try: - df = pd.read_csv(file_path) - text = df.to_string() - return { - "text": text, - "method_used": "pandas", - "methods_tried": ["pandas"], - "formatted_sections": [{"type": "table", "data": df.to_dict()}] if preserve_formatting else [] - } - except Exception as e: - raise OfficeFileError(f"CSV processing failed: {str(e)}") - - # Excel file handling - text = "" - formatted_sections = [] - method_used = None - - method_order = ["openpyxl", "pandas", "xlrd"] if extension == ".xlsx" else ["xlrd", "pandas", "openpyxl"] - - for method_name in method_order: - try: - methods_tried.append(method_name) - - if method_name == "openpyxl" and extension in [".xlsx", ".xlsm"]: - import openpyxl - wb = openpyxl.load_workbook(file_path, data_only=True) - - text_parts = [] - for sheet_name in wb.sheetnames: - ws = wb[sheet_name] - text_parts.append(f"Sheet: {sheet_name}") - - for row in ws.iter_rows(values_only=True): - row_text = "\t".join(str(cell) if cell is not None else "" for cell in row) - if row_text.strip(): - text_parts.append(row_text) - - if preserve_formatting: - formatted_sections.append({ - "type": "worksheet", - "name": sheet_name, - "data": [[str(cell.value) if cell.value is not None else "" for cell in row] for row in ws.iter_rows()] - }) - - text = "\n".join(text_parts) - method_used = "openpyxl" - break - - elif method_name == "pandas": - import pandas as pd - - if extension in [".xlsx", ".xlsm"]: - dfs = pd.read_excel(file_path, sheet_name=None) - else: # .xls - dfs = pd.read_excel(file_path, sheet_name=None, engine='xlrd') - - text_parts = [] - for sheet_name, df in dfs.items(): - text_parts.append(f"Sheet: {sheet_name}") - text_parts.append(df.to_string()) - - if preserve_formatting: - formatted_sections.append({ - "type": "dataframe", - "name": sheet_name, - "data": df.to_dict() - }) - - text = "\n\n".join(text_parts) - method_used = "pandas" - break - - elif method_name == "xlrd" and extension == ".xls": - import xlrd - wb = xlrd.open_workbook(file_path) - - text_parts = [] - for sheet in wb.sheets(): - text_parts.append(f"Sheet: {sheet.name}") - - for row_idx in range(sheet.nrows): - row = sheet.row_values(row_idx) - row_text = "\t".join(str(cell) for cell in row) - text_parts.append(row_text) - - text = "\n".join(text_parts) - method_used = "xlrd" - break - - except ImportError: - continue - except Exception: - continue - - if not method_used: - raise OfficeFileError(f"Failed to extract text using methods: {', '.join(methods_tried)}") - - return { - "text": text, - "method_used": method_used, - "methods_tried": methods_tried, - "formatted_sections": formatted_sections - } - - -async def _extract_powerpoint_text(file_path: str, extension: str, preserve_formatting: bool, method: str) -> dict[str, Any]: - """Extract text from PowerPoint documents.""" - methods_tried = [] - - if extension == ".pptx": - try: - import pptx - prs = pptx.Presentation(file_path) - - text_parts = [] - formatted_sections = [] - - for slide_num, slide in enumerate(prs.slides, 1): - slide_text_parts = [] - - for shape in slide.shapes: - if hasattr(shape, "text") and shape.text: - slide_text_parts.append(shape.text) - - slide_text = "\n".join(slide_text_parts) - text_parts.append(f"Slide {slide_num}:\n{slide_text}") - - if preserve_formatting: - formatted_sections.append({ - "type": "slide", - "number": slide_num, - "text": slide_text, - "shapes": len(slide.shapes) - }) - - text = "\n\n".join(text_parts) - - return { - "text": text, - "method_used": "python-pptx", - "methods_tried": ["python-pptx"], - "formatted_sections": formatted_sections - } - - except ImportError: - methods_tried.append("python-pptx") - except Exception: - methods_tried.append("python-pptx") - - # Legacy .ppt handling would require additional libraries - if extension == ".ppt": - raise OfficeFileError("Legacy PowerPoint (.ppt) text extraction requires additional setup") - - raise OfficeFileError(f"Failed to extract text using methods: {', '.join(methods_tried)}") - - -# Helper functions for image extraction async def _extract_word_images(file_path: str, extension: str, output_format: str, min_width: int, min_height: int) -> list[dict[str, Any]]: """Extract images from Word documents.""" + from ..utils import OfficeFileError + images = [] if extension == ".docx": try: - import io - import zipfile - - from PIL import Image - with zipfile.ZipFile(file_path, 'r') as zip_file: # Look for images in media folder image_files = [f for f in zip_file.namelist() if f.startswith('word/media/')] @@ -775,120 +161,6 @@ async def _extract_word_images(file_path: str, extension: str, output_format: st return images -async def _extract_excel_images(file_path: str, extension: str, output_format: str, min_width: int, min_height: int) -> list[dict[str, Any]]: - """Extract images from Excel documents.""" - images = [] - - if extension in [".xlsx", ".xlsm"]: - try: - import io - import zipfile - - from PIL import Image - - with zipfile.ZipFile(file_path, 'r') as zip_file: - # Look for images in media folder - image_files = [f for f in zip_file.namelist() if f.startswith('xl/media/')] - - for i, img_path in enumerate(image_files): - try: - img_data = zip_file.read(img_path) - img = Image.open(io.BytesIO(img_data)) - - # Size filtering - if img.width >= min_width and img.height >= min_height: - # Save to temp file - temp_path = os.path.join(TEMP_DIR, f"excel_image_{i}.{output_format}") - img.save(temp_path, format=output_format.upper()) - - images.append({ - "index": i, - "filename": os.path.basename(img_path), - "path": temp_path, - "width": img.width, - "height": img.height, - "format": img.format, - "size_bytes": len(img_data) - }) - except Exception: - continue - - except Exception as e: - raise OfficeFileError(f"Excel image extraction failed: {str(e)}") - - return images - - -async def _extract_powerpoint_images(file_path: str, extension: str, output_format: str, min_width: int, min_height: int) -> list[dict[str, Any]]: - """Extract images from PowerPoint documents.""" - images = [] - - if extension == ".pptx": - try: - import io - import zipfile - - from PIL import Image - - with zipfile.ZipFile(file_path, 'r') as zip_file: - # Look for images in media folder - image_files = [f for f in zip_file.namelist() if f.startswith('ppt/media/')] - - for i, img_path in enumerate(image_files): - try: - img_data = zip_file.read(img_path) - img = Image.open(io.BytesIO(img_data)) - - # Size filtering - if img.width >= min_width and img.height >= min_height: - # Save to temp file - temp_path = os.path.join(TEMP_DIR, f"powerpoint_image_{i}.{output_format}") - img.save(temp_path, format=output_format.upper()) - - images.append({ - "index": i, - "filename": os.path.basename(img_path), - "path": temp_path, - "width": img.width, - "height": img.height, - "format": img.format, - "size_bytes": len(img_data) - }) - except Exception: - continue - - except Exception as e: - raise OfficeFileError(f"PowerPoint image extraction failed: {str(e)}") - - return images - - -# Helper functions for metadata extraction -async def _extract_basic_metadata(file_path: str, extension: str, category: str) -> dict[str, Any]: - """Extract basic metadata from Office documents.""" - metadata = {"category": category, "extension": extension} - - try: - if extension in [".docx", ".xlsx", ".pptx"] and category in ["word", "excel", "powerpoint"]: - import zipfile - - with zipfile.ZipFile(file_path, 'r') as zip_file: - # Core properties - if 'docProps/core.xml' in zip_file.namelist(): - zip_file.read('docProps/core.xml').decode('utf-8') - metadata["has_core_properties"] = True - - # App properties - if 'docProps/app.xml' in zip_file.namelist(): - zip_file.read('docProps/app.xml').decode('utf-8') - metadata["has_app_properties"] = True - - except Exception: - pass - - return metadata - - async def _extract_word_metadata(file_path: str, extension: str) -> dict[str, Any]: """Extract Word-specific metadata.""" metadata = {"type": "word", "extension": extension} @@ -923,134 +195,6 @@ async def _extract_word_metadata(file_path: str, extension: str) -> dict[str, An return metadata -async def _extract_excel_metadata(file_path: str, extension: str) -> dict[str, Any]: - """Extract Excel-specific metadata.""" - metadata = {"type": "excel", "extension": extension} - - if extension in [".xlsx", ".xlsm"]: - try: - import openpyxl - wb = openpyxl.load_workbook(file_path) - - props = wb.properties - metadata.update({ - "title": props.title, - "creator": props.creator, - "subject": props.subject, - "description": props.description, - "keywords": props.keywords, - "created": str(props.created) if props.created else None, - "modified": str(props.modified) if props.modified else None - }) - - # Workbook structure - metadata.update({ - "worksheet_count": len(wb.worksheets), - "worksheet_names": wb.sheetnames, - "has_charts": any(len(ws._charts) > 0 for ws in wb.worksheets), - "has_images": any(len(ws._images) > 0 for ws in wb.worksheets) - }) - - except Exception: - pass - - return metadata - - -async def _extract_powerpoint_metadata(file_path: str, extension: str) -> dict[str, Any]: - """Extract PowerPoint-specific metadata.""" - metadata = {"type": "powerpoint", "extension": extension} - - if extension == ".pptx": - try: - import pptx - prs = pptx.Presentation(file_path) - - core_props = prs.core_properties - metadata.update({ - "title": core_props.title, - "author": core_props.author, - "subject": core_props.subject, - "keywords": core_props.keywords, - "comments": core_props.comments, - "created": str(core_props.created) if core_props.created else None, - "modified": str(core_props.modified) if core_props.modified else None - }) - - # Presentation structure - slide_layouts = set() - total_shapes = 0 - - for slide in prs.slides: - slide_layouts.add(slide.slide_layout.name) - total_shapes += len(slide.shapes) - - metadata.update({ - "slide_count": len(prs.slides), - "slide_layouts": list(slide_layouts), - "total_shapes": total_shapes, - "slide_width": prs.slide_width, - "slide_height": prs.slide_height - }) - - except Exception: - pass - - return metadata - - -def _calculate_health_score(validation: dict[str, Any], format_info: dict[str, Any]) -> int: - """Calculate document health score (1-10).""" - score = 10 - - # Deduct for validation errors - if not validation["is_valid"]: - score -= 5 - - if validation["errors"]: - score -= len(validation["errors"]) * 2 - - if validation["warnings"]: - score -= len(validation["warnings"]) - - # Deduct for problematic characteristics - if validation.get("password_protected"): - score -= 1 - - if format_info.get("is_legacy"): - score -= 1 - - structure = format_info.get("structure", {}) - if structure.get("estimated_complexity") == "complex": - score -= 1 - - return max(1, min(10, score)) - - -def _get_health_recommendations(validation: dict[str, Any], format_info: dict[str, Any]) -> list[str]: - """Get health improvement recommendations.""" - recommendations = [] - - if validation["errors"]: - recommendations.append("Fix validation errors before processing") - - if validation.get("password_protected"): - recommendations.append("Remove password protection if possible") - - if format_info.get("is_legacy"): - recommendations.append("Consider converting to modern format (.docx, .xlsx, .pptx)") - - structure = format_info.get("structure", {}) - if structure.get("estimated_complexity") == "complex": - recommendations.append("Complex document may require specialized processing") - - if not recommendations: - recommendations.append("Document appears healthy and ready for processing") - - return recommendations - - -# Markdown conversion helper functions async def _convert_docx_to_markdown( file_path: str, include_images: bool, @@ -1064,12 +208,10 @@ async def _convert_docx_to_markdown( chapter_name: str = "" ) -> dict[str, Any]: """Convert .docx file to markdown with comprehensive feature support.""" - import base64 - # ULTRA-FAST summary mode - skip all complex processing if summary_only: return await _get_ultra_fast_summary(file_path) - + # If page_numbers, bookmark_name, or chapter_name is specified, we need to use python-docx for targeted extraction # as mammoth processes the entire document if page_numbers or bookmark_name or chapter_name: @@ -1077,7 +219,7 @@ async def _convert_docx_to_markdown( file_path, include_images, image_mode, max_image_size, preserve_structure, page_numbers, summary_only, output_dir, bookmark_name, chapter_name ) - + try: # Try mammoth first for better HTML->Markdown conversion (full document only) import mammoth @@ -1179,7 +321,7 @@ async def _convert_docx_to_markdown( if summary_only and len(markdown_content) > 5000: # For summary mode, truncate large content markdown_content = markdown_content[:5000] + "\n\n[Content truncated - use summary_only=false for full content]" - + # Update the conversion result conversion_result["content"] = markdown_content @@ -1216,9 +358,12 @@ async def _convert_docx_with_python_docx( bookmark_name: str = "", chapter_name: str = "" ) -> dict[str, Any]: - """Convert .docx using python-docx with custom markdown conversion.""" - import base64 + """Convert .docx using python-docx with custom markdown conversion. + CRITICAL FIX: Lines 1305-1309 contain the page range fix that allows proper + extraction of large page ranges by calculating limits based on NUMBER of pages + requested, not the maximum page number. + """ import docx from docx.oxml.table import CT_Tbl from docx.oxml.text.paragraph import CT_P @@ -1290,7 +435,7 @@ async def _convert_docx_with_python_docx( if not chapter_range: return { "content": f"Chapter '{chapter_name}' not found in document. Available headings will be listed in processing_limits.", - "method_used": "python-docx-chapter-not-found", + "method_used": "python-docx-chapter-not-found", "images": [], "chapter_error": True, "available_headings": await _get_available_headings(doc) @@ -1299,7 +444,7 @@ async def _convert_docx_with_python_docx( max_chars = 100000 bookmark_range = None elif page_numbers: - # For page ranges, allow sufficient content for requested pages + # CRITICAL FIX: For page ranges, allow sufficient content for requested pages # Pages can vary wildly in paragraph count (some have 250+ paragraphs) # Base limits on NUMBER of pages requested, not max page number num_pages_requested = len(page_numbers) @@ -1314,7 +459,7 @@ async def _convert_docx_with_python_docx( max_chars = 200000 bookmark_range = None chapter_range = None - + current_page = 1 processed_paragraphs = 0 total_chars = 0 @@ -1338,13 +483,13 @@ async def _convert_docx_with_python_docx( # Early termination if we've processed enough content if processed_paragraphs >= max_paragraphs or total_chars >= max_chars: break - + # Skip elements outside bookmark/chapter range if targeted extraction is used if bookmark_range and not (bookmark_range['start_idx'] <= element_idx <= bookmark_range['end_idx']): continue if chapter_range and not (chapter_range['start_idx'] <= element_idx <= chapter_range['end_idx']): continue - + if isinstance(element, CT_P): paragraph = Paragraph(element, doc) @@ -1391,7 +536,7 @@ async def _convert_docx_with_python_docx( "page": current_page } structure_info["headings"].append(heading_info) - + # Add to table of contents table_of_contents.append({ "level": level, @@ -1413,7 +558,7 @@ async def _convert_docx_with_python_docx( table_length = len(table_markdown) if total_chars + table_length > max_chars: break # Stop processing - + markdown_parts.append(table_markdown) total_chars += table_length structure_info["tables"] += 1 @@ -1431,11 +576,11 @@ async def _convert_docx_with_python_docx( "method_used": "python-docx-custom", "images": images_info } - + # Add table of contents for navigation if table_of_contents: result["table_of_contents"] = _optimize_toc_page_ranges(table_of_contents) - + # Add processing limits info result["processing_limits"] = { "max_paragraphs_allowed": max_paragraphs, @@ -1445,7 +590,7 @@ async def _convert_docx_with_python_docx( "content_truncated": processed_paragraphs >= max_paragraphs or total_chars >= max_chars, "note": f"Processed {processed_paragraphs}/{max_paragraphs} paragraphs, {total_chars:,}/{max_chars:,} chars" } - + # Add extraction method info if bookmark_name and bookmark_range: result["bookmark_extraction"] = { @@ -1466,7 +611,7 @@ async def _convert_docx_with_python_docx( # Handle summary mode if summary_only and len(markdown_content) > 5000: markdown_content = markdown_content[:5000] + "\n\n[Content truncated - use summary_only=false for full content]" - + # Update the result content result["content"] = markdown_content @@ -1488,6 +633,8 @@ async def _convert_doc_to_markdown( output_dir: str ) -> dict[str, Any]: """Convert legacy .doc file to markdown using available methods.""" + from ..utils import OfficeFileError + try: import mammoth @@ -1501,10 +648,10 @@ async def _convert_doc_to_markdown( "images": [] # Legacy .doc image extraction is complex } - # Handle summary mode + # Handle summary mode if summary_only and len(markdown_content) > 5000: markdown_content = markdown_content[:5000] + "\n\n[Content truncated - use summary_only=false for full content]" - + # Update the conversion result conversion_result["content"] = markdown_content @@ -1520,6 +667,307 @@ async def _convert_doc_to_markdown( raise OfficeFileError(f"Legacy .doc conversion failed: {str(e)}") +async def _get_ultra_fast_summary(file_path: str) -> dict[str, Any]: + """Ultra-fast summary that extracts minimal data to prevent MCP token limits.""" + try: + import docx + doc = docx.Document(file_path) + + # Extract only the first few paragraphs and major headings + content_parts = [] + heading_count = 0 + paragraph_count = 0 + max_content_length = 2000 # Very short limit + current_length = 0 + + # Get basic structure info quickly + total_paragraphs = len(doc.paragraphs) + total_tables = len(doc.tables) + + # Extract bookmarks (chapter markers) + bookmarks = [] + try: + # Access document's bookmarks through the XML + for bookmark in doc.element.xpath('//w:bookmarkStart', namespaces={'w': 'http://schemas.openxmlformats.org/wordprocessingml/2006/main'}): + bookmark_name = bookmark.get('{http://schemas.openxmlformats.org/wordprocessingml/2006/main}name') + if bookmark_name and not bookmark_name.startswith('_'): # Skip system bookmarks + bookmarks.append(bookmark_name) + except Exception: + pass # Bookmarks extraction failed, continue without + + # Extract just a few key headings and the start of content + for para in doc.paragraphs[:50]: # Only check first 50 paragraphs + text = para.text.strip() + if not text: + continue + + # Check if it's a heading (simple heuristic) + is_heading = (para.style and "heading" in para.style.name.lower()) or len(text) < 100 + + if is_heading and heading_count < 10: # Max 10 headings + content_parts.append(f"# {text}") + heading_count += 1 + current_length += len(text) + 3 + elif paragraph_count < 5 and current_length < max_content_length: # Max 5 paragraphs + content_parts.append(text) + paragraph_count += 1 + current_length += len(text) + + if current_length > max_content_length: + break + + # Create very basic summary + summary_content = "\n\n".join(content_parts) + + # Extract available headings for chapter navigation + available_headings = await _get_available_headings(doc) + + return { + "content": summary_content, + "method_used": "ultra-fast-summary", + "table_of_contents": { + "note": "Use full document processing for detailed TOC", + "basic_info": f"Document has ~{total_paragraphs} paragraphs, {total_tables} tables, {heading_count} headings found in first scan", + "bookmarks": bookmarks[:20] if bookmarks else [], # Limit to first 20 bookmarks + "bookmark_count": len(bookmarks), + "bookmark_note": "Bookmarks often indicate chapter starts. Use these as navigation hints for page_range extraction.", + "available_headings": available_headings[:10] if available_headings else [], # Limit to first 10 headings + "heading_count": len(available_headings), + "heading_note": "Use these headings with chapter_name parameter for chapter-based extraction when bookmarks are not available." + } + } + + except Exception as e: + return { + "content": f"Error creating summary: {str(e)}", + "method_used": "error-fallback", + "table_of_contents": {"note": "Summary generation failed"} + } + + +async def _find_bookmark_content_range(doc, bookmark_name: str) -> dict[str, Any]: + """Find the content range for a specific bookmark.""" + try: + # Find bookmark start and end positions in the document + bookmark_starts = {} + bookmark_ends = {} + + # Look for bookmark markers in the document XML + for elem_idx, element in enumerate(doc.element.body): + # Look for bookmark start markers + for bookmark_start in element.xpath('.//w:bookmarkStart', namespaces={'w': 'http://schemas.openxmlformats.org/wordprocessingml/2006/main'}): + name = bookmark_start.get('{http://schemas.openxmlformats.org/wordprocessingml/2006/main}name') + if name == bookmark_name: + bookmark_id = bookmark_start.get('{http://schemas.openxmlformats.org/wordprocessingml/2006/main}id') + bookmark_starts[bookmark_id] = elem_idx + + # Look for bookmark end markers + for bookmark_end in element.xpath('.//w:bookmarkEnd', namespaces={'w': 'http://schemas.openxmlformats.org/wordprocessingml/2006/main'}): + bookmark_id = bookmark_end.get('{http://schemas.openxmlformats.org/wordprocessingml/2006/main}id') + if bookmark_id in bookmark_starts: + bookmark_ends[bookmark_id] = elem_idx + break + + # Find the bookmark range + for bookmark_id, start_idx in bookmark_starts.items(): + if bookmark_id in bookmark_ends: + end_idx = bookmark_ends[bookmark_id] + # Extend range to capture full sections (look for next major heading) + extended_end = min(end_idx + 50, len(doc.element.body) - 1) # Extend by 50 elements or end of doc + return { + 'start_idx': start_idx, + 'end_idx': extended_end, + 'bookmark_id': bookmark_id, + 'note': f"Extracting content from bookmark '{bookmark_name}' (elements {start_idx}-{extended_end})" + } + + return None # Bookmark not found + + except Exception: + return None # Error finding bookmark + + +async def _find_chapter_content_range(doc, chapter_name: str) -> dict[str, Any]: + """Find the content range for a specific chapter by heading text.""" + try: + # Find heading that matches the chapter name + chapter_start_idx = None + chapter_end_idx = None + + # Search through document elements for matching heading + for elem_idx, element in enumerate(doc.element.body): + # Check if this element is a paragraph with heading style + try: + para = element + if para.tag.endswith('}p'): # Word paragraph element + # Get the text content + text_content = ''.join(text_elem.text or '' for text_elem in para.xpath('.//w:t', namespaces={'w': 'http://schemas.openxmlformats.org/wordprocessingml/2006/main'})) + + # Check if this matches our chapter name (case insensitive, flexible matching) + if text_content.strip() and chapter_name.lower() in text_content.lower().strip(): + # Check if it's actually a heading by looking at paragraph style + style_elem = para.xpath('.//w:pStyle', namespaces={'w': 'http://schemas.openxmlformats.org/wordprocessingml/2006/main'}) + if style_elem: + style_val = style_elem[0].get('{http://schemas.openxmlformats.org/wordprocessingml/2006/main}val', '') + if 'heading' in style_val.lower() or 'title' in style_val.lower(): + chapter_start_idx = elem_idx + break + # Also consider short text lines as potential headings + elif len(text_content.strip()) < 100: + chapter_start_idx = elem_idx + break + except Exception: + continue + + if chapter_start_idx is None: + return None # Chapter heading not found + + # Find the end of this chapter (next major heading or end of document) + chapter_end_idx = len(doc.element.body) - 1 # Default to end of document + + # Look for the next major heading to determine chapter end + for elem_idx in range(chapter_start_idx + 1, len(doc.element.body)): + try: + para = doc.element.body[elem_idx] + if para.tag.endswith('}p'): + # Check if this is a major heading (same level or higher than chapter start) + style_elem = para.xpath('.//w:pStyle', namespaces={'w': 'http://schemas.openxmlformats.org/wordprocessingml/2006/main'}) + if style_elem: + style_val = style_elem[0].get('{http://schemas.openxmlformats.org/wordprocessingml/2006/main}val', '') + if 'heading1' in style_val.lower() or 'title' in style_val.lower(): + chapter_end_idx = elem_idx - 1 + break + except Exception: + continue + + return { + 'start_idx': chapter_start_idx, + 'end_idx': chapter_end_idx, + 'chapter_name': chapter_name, + 'note': f"Extracting content for chapter '{chapter_name}' (elements {chapter_start_idx}-{chapter_end_idx})" + } + + except Exception: + return None # Error finding chapter + + +async def _get_available_headings(doc) -> list[str]: + """Extract available headings from the document to help users find chapter names. + + CRITICAL FIX: Line 1804 ensures we scan ALL elements (not just first 100) + while still limiting results to 30 headings to prevent token issues. + """ + try: + headings = [] + + # Search through ALL document elements for headings (not limited to first 100) + # This ensures we find chapters at the end of long documents + for element in doc.element.body: + # Early exit if we have enough headings + if len(headings) >= 30: + break + + try: + if element.tag.endswith('}p'): # Word paragraph element + # Get the text content + text_content = ''.join(text_elem.text or '' for text_elem in element.xpath('.//w:t', namespaces={'w': 'http://schemas.openxmlformats.org/wordprocessingml/2006/main'})) + + if text_content.strip(): + # Check if it's a heading by looking at paragraph style + style_elem = element.xpath('.//w:pStyle', namespaces={'w': 'http://schemas.openxmlformats.org/wordprocessingml/2006/main'}) + if style_elem: + style_val = style_elem[0].get('{http://schemas.openxmlformats.org/wordprocessingml/2006/main}val', '') + if 'heading' in style_val.lower() or 'title' in style_val.lower(): + headings.append(text_content.strip()[:100]) # Limit heading length + # Also consider short text lines as potential headings + elif len(text_content.strip()) < 100: + # Only add if it looks like a heading (not just short random text) + if any(word in text_content.lower() for word in ['chapter', 'section', 'part', 'introduction', 'conclusion']): + headings.append(text_content.strip()) + except Exception: + continue + + return headings[:20] # Return max 20 headings to avoid token issues + + except Exception: + return [] + + +def _has_page_break(paragraph) -> bool: + """Check if a paragraph contains a page break.""" + try: + # Check for explicit page breaks in paragraph runs + for run in paragraph.runs: + if run._r.find('.//{http://schemas.openxmlformats.org/wordprocessingml/2006/main}br') is not None: + br_elem = run._r.find('.//{http://schemas.openxmlformats.org/wordprocessingml/2006/main}br') + if br_elem is not None and br_elem.get('{http://schemas.openxmlformats.org/wordprocessingml/2006/main}type') == 'page': + return True + return False + except Exception: + return False + + +async def _analyze_document_size(file_path: str, extension: str) -> dict[str, Any]: + """Analyze document to estimate size and complexity.""" + analysis = { + "estimated_pages": 1, + "file_size_mb": 0, + "complexity": "simple", + "estimated_content_size": "small" + } + + try: + # Get file size + file_size = Path(file_path).stat().st_size + analysis["file_size_mb"] = round(file_size / (1024 * 1024), 2) + + if extension == ".docx": + try: + import docx + doc = docx.Document(file_path) + + # Estimate pages based on content + paragraph_count = len(doc.paragraphs) + table_count = len(doc.tables) + + # Rough estimation: ~40 paragraphs per page + estimated_pages = max(1, paragraph_count // 40) + analysis["estimated_pages"] = estimated_pages + + # Determine complexity + if table_count > 10 or paragraph_count > 500: + analysis["complexity"] = "complex" + elif table_count > 5 or paragraph_count > 200: + analysis["complexity"] = "moderate" + + # Estimate content size + if estimated_pages > 20: + analysis["estimated_content_size"] = "very_large" + elif estimated_pages > 10: + analysis["estimated_content_size"] = "large" + elif estimated_pages > 5: + analysis["estimated_content_size"] = "medium" + + except Exception: + # Fallback to file size estimation + if file_size > 5 * 1024 * 1024: # 5MB + analysis["estimated_pages"] = 50 + analysis["estimated_content_size"] = "very_large" + elif file_size > 1 * 1024 * 1024: # 1MB + analysis["estimated_pages"] = 20 + analysis["estimated_content_size"] = "large" + elif file_size > 500 * 1024: # 500KB + analysis["estimated_pages"] = 10 + analysis["estimated_content_size"] = "medium" + + except Exception: + pass + + return analysis + + +# Helper functions for markdown conversion + def _paragraph_to_markdown(paragraph, preserve_structure: bool) -> str: """Convert a Word paragraph to markdown format.""" text = paragraph.text.strip() @@ -1534,7 +982,6 @@ def _paragraph_to_markdown(paragraph, preserve_structure: bool) -> str: if "heading" in style_name: # Extract heading level from style name - import re level_match = re.search(r'(\d+)', style_name) level = int(level_match.group(1)) if level_match else 1 return f"{'#' * level} {text}" @@ -1569,8 +1016,6 @@ def _table_to_markdown(table) -> str: def _html_to_markdown(html_content: str, preserve_structure: bool) -> str: """Convert HTML content to markdown format.""" - import re - # Basic HTML to Markdown conversions conversions = [ (r']*>(.*?)', r'# \1'), @@ -1605,48 +1050,8 @@ def _html_to_markdown(html_content: str, preserve_structure: bool) -> str: return markdown -def _chunk_markdown(content: str, chunk_size: int) -> list[dict[str, Any]]: - """Split markdown content into chunks while preserving structure.""" - chunks = [] - lines = content.split('\n') - current_chunk = [] - current_size = 0 - chunk_num = 1 - - for line in lines: - line_size = len(line) + 1 # +1 for newline - - # If adding this line would exceed chunk size and we have content - if current_size + line_size > chunk_size and current_chunk: - chunks.append({ - "chunk_number": chunk_num, - "content": '\n'.join(current_chunk), - "character_count": current_size, - "line_count": len(current_chunk) - }) - current_chunk = [] - current_size = 0 - chunk_num += 1 - - current_chunk.append(line) - current_size += line_size - - # Add final chunk if there's remaining content - if current_chunk: - chunks.append({ - "chunk_number": chunk_num, - "content": '\n'.join(current_chunk), - "character_count": current_size, - "line_count": len(current_chunk) - }) - - return chunks - - def _extract_markdown_structure(content: str) -> dict[str, Any]: """Extract structure information from markdown content.""" - import re - structure = { "headings": [], "lists": 0, @@ -1686,271 +1091,6 @@ def _extract_markdown_structure(content: str) -> dict[str, Any]: return structure -async def _find_bookmark_content_range(doc, bookmark_name: str) -> dict[str, Any]: - """Find the content range for a specific bookmark.""" - try: - # Find bookmark start and end positions in the document - bookmark_starts = {} - bookmark_ends = {} - - # Look for bookmark markers in the document XML - for elem_idx, element in enumerate(doc.element.body): - # Look for bookmark start markers - for bookmark_start in element.xpath('.//w:bookmarkStart', namespaces={'w': 'http://schemas.openxmlformats.org/wordprocessingml/2006/main'}): - name = bookmark_start.get('{http://schemas.openxmlformats.org/wordprocessingml/2006/main}name') - if name == bookmark_name: - bookmark_id = bookmark_start.get('{http://schemas.openxmlformats.org/wordprocessingml/2006/main}id') - bookmark_starts[bookmark_id] = elem_idx - - # Look for bookmark end markers - for bookmark_end in element.xpath('.//w:bookmarkEnd', namespaces={'w': 'http://schemas.openxmlformats.org/wordprocessingml/2006/main'}): - bookmark_id = bookmark_end.get('{http://schemas.openxmlformats.org/wordprocessingml/2006/main}id') - if bookmark_id in bookmark_starts: - bookmark_ends[bookmark_id] = elem_idx - break - - # Find the bookmark range - for bookmark_id, start_idx in bookmark_starts.items(): - if bookmark_id in bookmark_ends: - end_idx = bookmark_ends[bookmark_id] - # Extend range to capture full sections (look for next major heading) - extended_end = min(end_idx + 50, len(doc.element.body) - 1) # Extend by 50 elements or end of doc - return { - 'start_idx': start_idx, - 'end_idx': extended_end, - 'bookmark_id': bookmark_id, - 'note': f"Extracting content from bookmark '{bookmark_name}' (elements {start_idx}-{extended_end})" - } - - return None # Bookmark not found - - except Exception: - return None # Error finding bookmark - - -async def _find_chapter_content_range(doc, chapter_name: str) -> dict[str, Any]: - """Find the content range for a specific chapter by heading text.""" - try: - # Find heading that matches the chapter name - chapter_start_idx = None - chapter_end_idx = None - - # Search through document elements for matching heading - for elem_idx, element in enumerate(doc.element.body): - # Check if this element is a paragraph with heading style - try: - para = element - if para.tag.endswith('}p'): # Word paragraph element - # Get the text content - text_content = ''.join(text_elem.text or '' for text_elem in para.xpath('.//w:t', namespaces={'w': 'http://schemas.openxmlformats.org/wordprocessingml/2006/main'})) - - # Check if this matches our chapter name (case insensitive, flexible matching) - if text_content.strip() and chapter_name.lower() in text_content.lower().strip(): - # Check if it's actually a heading by looking at paragraph style - style_elem = para.xpath('.//w:pStyle', namespaces={'w': 'http://schemas.openxmlformats.org/wordprocessingml/2006/main'}) - if style_elem: - style_val = style_elem[0].get('{http://schemas.openxmlformats.org/wordprocessingml/2006/main}val', '') - if 'heading' in style_val.lower() or 'title' in style_val.lower(): - chapter_start_idx = elem_idx - break - # Also consider short text lines as potential headings - elif len(text_content.strip()) < 100: - chapter_start_idx = elem_idx - break - except Exception: - continue - - if chapter_start_idx is None: - return None # Chapter heading not found - - # Find the end of this chapter (next major heading or end of document) - chapter_end_idx = len(doc.element.body) - 1 # Default to end of document - - # Look for the next major heading to determine chapter end - for elem_idx in range(chapter_start_idx + 1, len(doc.element.body)): - try: - para = doc.element.body[elem_idx] - if para.tag.endswith('}p'): - # Check if this is a major heading (same level or higher than chapter start) - style_elem = para.xpath('.//w:pStyle', namespaces={'w': 'http://schemas.openxmlformats.org/wordprocessingml/2006/main'}) - if style_elem: - style_val = style_elem[0].get('{http://schemas.openxmlformats.org/wordprocessingml/2006/main}val', '') - if 'heading1' in style_val.lower() or 'title' in style_val.lower(): - chapter_end_idx = elem_idx - 1 - break - except Exception: - continue - - return { - 'start_idx': chapter_start_idx, - 'end_idx': chapter_end_idx, - 'chapter_name': chapter_name, - 'note': f"Extracting content for chapter '{chapter_name}' (elements {chapter_start_idx}-{chapter_end_idx})" - } - - except Exception: - return None # Error finding chapter - - -async def _get_available_headings(doc) -> list[str]: - """Extract available headings from the document to help users find chapter names.""" - try: - headings = [] - - # Search through ALL document elements for headings (not limited to first 100) - # This ensures we find chapters at the end of long documents - for element in doc.element.body: - # Early exit if we have enough headings - if len(headings) >= 30: - break - - try: - if element.tag.endswith('}p'): # Word paragraph element - # Get the text content - text_content = ''.join(text_elem.text or '' for text_elem in element.xpath('.//w:t', namespaces={'w': 'http://schemas.openxmlformats.org/wordprocessingml/2006/main'})) - - if text_content.strip(): - # Check if it's a heading by looking at paragraph style - style_elem = element.xpath('.//w:pStyle', namespaces={'w': 'http://schemas.openxmlformats.org/wordprocessingml/2006/main'}) - if style_elem: - style_val = style_elem[0].get('{http://schemas.openxmlformats.org/wordprocessingml/2006/main}val', '') - if 'heading' in style_val.lower() or 'title' in style_val.lower(): - headings.append(text_content.strip()[:100]) # Limit heading length - # Also consider short text lines as potential headings - elif len(text_content.strip()) < 100: - # Only add if it looks like a heading (not just short random text) - if any(word in text_content.lower() for word in ['chapter', 'section', 'part', 'introduction', 'conclusion']): - headings.append(text_content.strip()) - except Exception: - continue - - return headings[:20] # Return max 20 headings to avoid token issues - - except Exception: - return [] - - -async def _get_ultra_fast_summary(file_path: str) -> dict[str, Any]: - """Ultra-fast summary that extracts minimal data to prevent MCP token limits.""" - try: - import docx - doc = docx.Document(file_path) - - # Extract only the first few paragraphs and major headings - content_parts = [] - heading_count = 0 - paragraph_count = 0 - max_content_length = 2000 # Very short limit - current_length = 0 - - # Get basic structure info quickly - total_paragraphs = len(doc.paragraphs) - total_tables = len(doc.tables) - - # Extract bookmarks (chapter markers) - bookmarks = [] - try: - # Access document's bookmarks through the XML - for bookmark in doc.element.xpath('//w:bookmarkStart', namespaces={'w': 'http://schemas.openxmlformats.org/wordprocessingml/2006/main'}): - bookmark_name = bookmark.get('{http://schemas.openxmlformats.org/wordprocessingml/2006/main}name') - if bookmark_name and not bookmark_name.startswith('_'): # Skip system bookmarks - bookmarks.append(bookmark_name) - except Exception: - pass # Bookmarks extraction failed, continue without - - # Extract just a few key headings and the start of content - for para in doc.paragraphs[:50]: # Only check first 50 paragraphs - text = para.text.strip() - if not text: - continue - - # Check if it's a heading (simple heuristic) - is_heading = (para.style and "heading" in para.style.name.lower()) or len(text) < 100 - - if is_heading and heading_count < 10: # Max 10 headings - content_parts.append(f"# {text}") - heading_count += 1 - current_length += len(text) + 3 - elif paragraph_count < 5 and current_length < max_content_length: # Max 5 paragraphs - content_parts.append(text) - paragraph_count += 1 - current_length += len(text) - - if current_length > max_content_length: - break - - # Create very basic summary - summary_content = "\n\n".join(content_parts) - - # Extract available headings for chapter navigation - available_headings = await _get_available_headings(doc) - - return { - "content": summary_content, - "method_used": "ultra-fast-summary", - "table_of_contents": { - "note": "Use full document processing for detailed TOC", - "basic_info": f"Document has ~{total_paragraphs} paragraphs, {total_tables} tables, {heading_count} headings found in first scan", - "bookmarks": bookmarks[:20] if bookmarks else [], # Limit to first 20 bookmarks - "bookmark_count": len(bookmarks), - "bookmark_note": "Bookmarks often indicate chapter starts. Use these as navigation hints for page_range extraction.", - "available_headings": available_headings[:10] if available_headings else [], # Limit to first 10 headings - "heading_count": len(available_headings), - "heading_note": "Use these headings with chapter_name parameter for chapter-based extraction when bookmarks are not available." - } - } - - except Exception as e: - return { - "content": f"Error creating summary: {str(e)}", - "method_used": "error-fallback", - "table_of_contents": {"note": "Summary generation failed"} - } - - -def _smart_truncate_content(content: str, max_chars: int) -> str: - """Intelligently truncate content while preserving structure and readability.""" - if len(content) <= max_chars: - return content - - lines = content.split('\n') - truncated_lines = [] - current_length = 0 - - # Try to preserve structure by stopping at a natural break point - for line in lines: - line_length = len(line) + 1 # +1 for newline - - # If adding this line would exceed limit - if current_length + line_length > max_chars: - # Try to find a good stopping point - if truncated_lines: - # Check if we're in the middle of a section - last_lines = '\n'.join(truncated_lines[-3:]) if len(truncated_lines) >= 3 else '\n'.join(truncated_lines) - - # If we stopped mid-paragraph, remove incomplete paragraph - if not (line.strip() == '' or line.startswith('#') or line.startswith('|')): - # Remove lines until we hit a natural break - while truncated_lines and not ( - truncated_lines[-1].strip() == '' or - truncated_lines[-1].startswith('#') or - truncated_lines[-1].startswith('|') or - truncated_lines[-1].startswith('-') or - truncated_lines[-1].startswith('*') - ): - truncated_lines.pop() - break - - truncated_lines.append(line) - current_length += line_length - - # Add truncation notice - result = '\n'.join(truncated_lines) - result += f"\n\n---\n**[CONTENT TRUNCATED]**\nShowing {len(result):,} of {len(content):,} characters.\nUse smaller page ranges (e.g., 3-5 pages) for full content without truncation.\n---" - - return result - - def _estimate_section_length(heading_level: int) -> int: """Estimate how many pages a section might span based on heading level.""" # Higher level headings (H1) tend to have longer sections @@ -1971,7 +1111,7 @@ def _optimize_toc_page_ranges(toc_entries: list) -> dict[str, Any]: "total_sections": len(toc_entries), "suggested_chunking": [] } - + for i, entry in enumerate(toc_entries): # Calculate actual end page based on next heading or document end if i + 1 < len(toc_entries): @@ -1980,7 +1120,7 @@ def _optimize_toc_page_ranges(toc_entries: list) -> dict[str, Any]: else: # Last section - use estimated length actual_end_page = entry["page"] + _estimate_section_length(entry["level"]) - + optimized_entry = { "level": entry["level"], "title": entry["title"], @@ -1990,17 +1130,17 @@ def _optimize_toc_page_ranges(toc_entries: list) -> dict[str, Any]: "section_type": _classify_section_type(entry["level"], entry["title"]) } optimized_toc["sections"].append(optimized_entry) - + # Generate chunking suggestions optimized_toc["suggested_chunking"] = _generate_chunking_suggestions(optimized_toc["sections"]) - + return optimized_toc def _classify_section_type(level: int, title: str) -> str: """Classify section type based on level and title patterns.""" title_lower = title.lower() - + if level == 1: if any(word in title_lower for word in ["chapter", "part", "section"]): return "chapter" @@ -2022,10 +1162,10 @@ def _generate_chunking_suggestions(sections: list) -> list[dict[str, Any]]: current_chunk_pages = 0 chunk_start = 1 chunk_sections = [] - + for section in sections: section_pages = section["estimated_end_page"] - section["start_page"] + 1 - + # If adding this section would make chunk too large, finalize current chunk # Use smaller chunks (8 pages) to prevent MCP token limit issues if current_chunk_pages + section_pages > 8 and chunk_sections: @@ -2034,10 +1174,10 @@ def _generate_chunking_suggestions(sections: list) -> list[dict[str, Any]]: "page_range": f"{chunk_start}-{chunk_sections[-1]['estimated_end_page']}", "sections_included": [s["title"] for s in chunk_sections], "estimated_pages": current_chunk_pages, - "description": f"Chunk {len(suggestions) + 1}: {chunk_sections[0]['title']}" + + "description": f"Chunk {len(suggestions) + 1}: {chunk_sections[0]['title']}" + (f" + {len(chunk_sections)-1} more sections" if len(chunk_sections) > 1 else "") }) - + # Start new chunk chunk_start = section["start_page"] current_chunk_pages = section_pages @@ -2046,7 +1186,7 @@ def _generate_chunking_suggestions(sections: list) -> list[dict[str, Any]]: # Add to current chunk current_chunk_pages += section_pages chunk_sections.append(section) - + # Add final chunk if any sections remain if chunk_sections: suggestions.append({ @@ -2054,196 +1194,8 @@ def _generate_chunking_suggestions(sections: list) -> list[dict[str, Any]]: "page_range": f"{chunk_start}-{chunk_sections[-1]['estimated_end_page']}", "sections_included": [s["title"] for s in chunk_sections], "estimated_pages": current_chunk_pages, - "description": f"Chunk {len(suggestions) + 1}: {chunk_sections[0]['title']}" + + "description": f"Chunk {len(suggestions) + 1}: {chunk_sections[0]['title']}" + (f" + {len(chunk_sections)-1} more sections" if len(chunk_sections) > 1 else "") }) - + return suggestions - - -def _has_page_break(paragraph) -> bool: - """Check if a paragraph contains a page break.""" - try: - # Check for explicit page breaks in paragraph runs - for run in paragraph.runs: - if run._r.find('.//{http://schemas.openxmlformats.org/wordprocessingml/2006/main}br') is not None: - br_elem = run._r.find('.//{http://schemas.openxmlformats.org/wordprocessingml/2006/main}br') - if br_elem is not None and br_elem.get('{http://schemas.openxmlformats.org/wordprocessingml/2006/main}type') == 'page': - return True - return False - except Exception: - return False - - -def _parse_page_range(page_range: str) -> list[int]: - """Parse page range string into list of page numbers. - - Examples: - "1-5" -> [1, 2, 3, 4, 5] - "1,3,5" -> [1, 3, 5] - "1-3,5,7-9" -> [1, 2, 3, 5, 7, 8, 9] - """ - pages = set() - - for part in page_range.split(','): - part = part.strip() - if '-' in part: - # Handle range like "1-5" - start, end = part.split('-', 1) - try: - start_num = int(start.strip()) - end_num = int(end.strip()) - pages.update(range(start_num, end_num + 1)) - except ValueError: - continue - else: - # Handle single page like "3" - try: - pages.add(int(part)) - except ValueError: - continue - - return sorted(list(pages)) - - -async def _analyze_document_size(file_path: str, extension: str) -> dict[str, Any]: - """Analyze document to estimate size and complexity.""" - analysis = { - "estimated_pages": 1, - "file_size_mb": 0, - "complexity": "simple", - "estimated_content_size": "small" - } - - try: - # Get file size - from pathlib import Path - file_size = Path(file_path).stat().st_size - analysis["file_size_mb"] = round(file_size / (1024 * 1024), 2) - - if extension == ".docx": - try: - import docx - doc = docx.Document(file_path) - - # Estimate pages based on content - paragraph_count = len(doc.paragraphs) - table_count = len(doc.tables) - - # Rough estimation: ~40 paragraphs per page - estimated_pages = max(1, paragraph_count // 40) - analysis["estimated_pages"] = estimated_pages - - # Determine complexity - if table_count > 10 or paragraph_count > 500: - analysis["complexity"] = "complex" - elif table_count > 5 or paragraph_count > 200: - analysis["complexity"] = "moderate" - - # Estimate content size - if estimated_pages > 20: - analysis["estimated_content_size"] = "very_large" - elif estimated_pages > 10: - analysis["estimated_content_size"] = "large" - elif estimated_pages > 5: - analysis["estimated_content_size"] = "medium" - - except Exception: - # Fallback to file size estimation - if file_size > 5 * 1024 * 1024: # 5MB - analysis["estimated_pages"] = 50 - analysis["estimated_content_size"] = "very_large" - elif file_size > 1 * 1024 * 1024: # 1MB - analysis["estimated_pages"] = 20 - analysis["estimated_content_size"] = "large" - elif file_size > 500 * 1024: # 500KB - analysis["estimated_pages"] = 10 - analysis["estimated_content_size"] = "medium" - - except Exception: - pass - - return analysis - - -def _get_processing_recommendation( - doc_analysis: dict[str, Any], - page_range: str, - summary_only: bool -) -> dict[str, Any]: - """Generate intelligent processing recommendations based on document analysis.""" - - estimated_pages = doc_analysis["estimated_pages"] - content_size = doc_analysis["estimated_content_size"] - - recommendation = { - "status": "optimal", - "message": "", - "suggested_workflow": [], - "warnings": [] - } - - # Large document recommendations - if content_size in ["large", "very_large"] and not page_range and not summary_only: - recommendation["status"] = "suboptimal" - recommendation["message"] = ( - f"⚠️ Large document detected ({estimated_pages} estimated pages). " - "Consider using recommended workflow for better performance." - ) - recommendation["suggested_workflow"] = [ - "1. First: Call with summary_only=true to get document overview and TOC", - "2. Then: Use page_range to process specific sections (e.g., '1-5', '6-10', '15-20')", - "3. Recommended: Use 3-8 page chunks to stay under 25k token MCP limit", - "4. The tool auto-truncates if content is too large, but smaller ranges work better" - ] - recommendation["warnings"] = [ - "Page ranges >8 pages may hit 25k token response limit and get truncated", - "Use smaller page ranges (3-5 pages) for dense content documents", - "Auto-truncation preserves structure but loses content completeness" - ] - - # Medium document recommendations - elif content_size == "medium" and not page_range and not summary_only: - recommendation["status"] = "caution" - recommendation["message"] = ( - f"Medium document detected ({estimated_pages} estimated pages). " - "Consider summary_only=true first if you encounter response size issues." - ) - recommendation["suggested_workflow"] = [ - "Option 1: Try full processing (current approach)", - "Option 2: Use summary_only=true first, then page_range if needed" - ] - - # Optimal usage patterns - elif summary_only: - recommendation["message"] = "✅ Excellent! Using summary mode for initial document analysis." - recommendation["suggested_workflow"] = [ - "After reviewing summary, use page_range to extract specific sections of interest" - ] - - elif page_range and content_size in ["large", "very_large"]: - recommendation["message"] = "✅ Perfect! Using page-range processing for efficient extraction." - - elif content_size == "small": - recommendation["message"] = "✅ Small document - full processing is optimal." - - return recommendation - - -def main(): - """Main entry point for the MCP server.""" - import sys - - if len(sys.argv) > 1 and sys.argv[1] == "--version": - from . import __version__ - print(f"MCP Office Tools v{__version__}") - return - - # Run the FastMCP server - # CRITICAL: show_banner=False is required for stdio transport! - # FastMCP's banner prints ASCII art to stdout which breaks JSON-RPC protocol - app.run(show_banner=False) - - -if __name__ == "__main__": - main()