"""Word Document Tools Mixin - Specialized tools for Word document processing.""" import os import time from typing import Any, Optional from fastmcp.contrib.mcp_mixin import MCPMixin, mcp_tool from pydantic import Field from ..utils import ( OfficeFileError, resolve_office_file_path, validate_office_file, detect_format, resolve_field_defaults, handle_office_errors ) from ..pagination import paginate_document_conversion, PaginationParams class WordMixin(MCPMixin): """Mixin containing Word-specific tools for advanced document processing.""" @mcp_tool( name="convert_to_markdown", description="Convert Office documents to Markdown format with intelligent processing and automatic pagination for large documents. ⚠️ LARGE DOCUMENT HANDLING: Documents exceeding 25k tokens are automatically paginated into manageable sections. Use cursor_id to continue through pages. For massive documents (200+ pages), pagination prevents token limit errors while preserving document structure and context." ) @handle_office_errors("Markdown conversion") @resolve_field_defaults( include_images=True, image_mode="base64", max_image_size=1024*1024, preserve_structure=True, page_range="", bookmark_name="", chapter_name="", summary_only=False, output_dir="", limit=50, cursor_id=None, session_id=None, return_all=False ) async def convert_to_markdown( self, file_path: str = Field(description="Path to Office document or URL"), include_images: bool = Field(default=True, description="Include images in markdown with base64 encoding or file references"), image_mode: str = Field(default="base64", description="Image handling mode: 'base64', 'files', or 'references'"), max_image_size: int = Field(default=1024*1024, description="Maximum image size in bytes for base64 encoding"), preserve_structure: bool = Field(default=True, description="Preserve document structure (headings, lists, tables)"), page_range: str = Field(default="", description="Page range to convert (e.g., '1-5', '3', '1,3,5-10'). RECOMMENDED for large documents. Empty = all pages"), bookmark_name: str = Field(default="", description="Extract content for a specific bookmark/chapter (e.g., 'Chapter1_Start'). More reliable than page ranges."), chapter_name: str = Field(default="", description="Extract content for a chapter by heading text (e.g., 'Chapter 1', 'Introduction'). Works when bookmarks aren't available."), summary_only: bool = Field(default=False, description="Return only metadata and truncated summary. STRONGLY RECOMMENDED for large docs (>10 pages)"), output_dir: str = Field(default="", description="Output directory for image files (if image_mode='files')"), # Pagination parameters limit: int = Field(default=50, description="Maximum number of document sections to return per page"), cursor_id: Optional[str] = Field(default=None, description="Cursor ID for pagination continuation"), session_id: Optional[str] = Field(default=None, description="Session ID for pagination isolation"), return_all: bool = Field(default=False, description="Return entire document bypassing pagination (WARNING: may exceed token limits)") ) -> dict[str, Any]: start_time = time.time() # Resolve file path local_path = await resolve_office_file_path(file_path) # Validate file validation = await validate_office_file(local_path) if not validation["is_valid"]: raise OfficeFileError(f"Invalid file: {', '.join(validation['errors'])}") # Get format info format_info = await detect_format(local_path) category = format_info["category"] extension = format_info["extension"] # Currently focused on Word documents for markdown conversion if category != "word": raise OfficeFileError(f"Markdown conversion currently only supports Word documents, got: {category}") # Analyze document size and provide intelligent recommendations doc_analysis = await self._analyze_document_size(local_path, extension) processing_recommendation = self._get_processing_recommendation( doc_analysis, page_range, summary_only ) # Parse page range if provided page_numbers = self._parse_page_range(page_range) if page_range else None # Prioritize bookmark/chapter extraction over page ranges if bookmark_name or chapter_name: page_numbers = None # Ignore page ranges when bookmark or chapter is specified # Convert to markdown based on format if extension == ".docx": markdown_result = await self._convert_docx_to_markdown( local_path, include_images, image_mode, max_image_size, preserve_structure, page_numbers, summary_only, output_dir, bookmark_name, chapter_name ) else: # .doc # For legacy .doc files, use mammoth if available markdown_result = await self._convert_doc_to_markdown( local_path, include_images, image_mode, max_image_size, preserve_structure, page_numbers, summary_only, output_dir ) # Check if pagination is needed markdown_content = markdown_result["content"] estimated_tokens = len(markdown_content) // 4 # Rough token estimation # Generate session ID if not provided if not session_id: session_id = f"word-{int(time.time())}-{os.getpid()}" # Create pagination parameters pagination_params = PaginationParams( limit=limit, cursor_id=cursor_id, session_id=session_id, return_all=return_all ) # Apply pagination if content is large or pagination is explicitly requested # Skip pagination only if return_all=True AND no cursor_id AND content is manageable should_paginate = (cursor_id or estimated_tokens > 25000 or (not return_all and estimated_tokens > 8000)) if should_paginate: paginated_result = paginate_document_conversion( tool_name="convert_to_markdown", document_path=local_path, markdown_content=markdown_content, params=pagination_params, session_id=session_id, total_estimated_tokens=estimated_tokens ) # If pagination was applied, return the paginated result if "pagination" in paginated_result: # Add metadata to the paginated result paginated_result["metadata"] = { "original_file": os.path.basename(local_path), "format": format_info["format_name"], "conversion_method": markdown_result["method_used"], "conversion_time": round(time.time() - start_time, 3), "summary_only": summary_only, "document_analysis": doc_analysis, "processing_recommendation": processing_recommendation, "session_id": session_id } # Add additional metadata from original result if "images" in markdown_result: paginated_result["metadata"]["images_found"] = len(markdown_result["images"]) if "structure" in markdown_result: paginated_result["metadata"]["structure_preserved"] = bool(markdown_result["structure"]) return paginated_result # Build result based on mode (non-paginated or bypass pagination) result = { "metadata": { "original_file": os.path.basename(local_path), "format": format_info["format_name"], "conversion_method": markdown_result["method_used"], "conversion_time": round(time.time() - start_time, 3), "summary_only": summary_only, "document_analysis": doc_analysis, "processing_recommendation": processing_recommendation, "session_id": session_id, "estimated_tokens": estimated_tokens } } # Add page range info if used if page_range: result["metadata"]["page_range"] = page_range result["metadata"]["pages_processed"] = len(page_numbers) if page_numbers else 0 # Add content based on mode if summary_only: # VERY restrictive summary mode to prevent massive responses result["metadata"]["character_count"] = len(markdown_result["content"]) result["metadata"]["word_count"] = len(markdown_result["content"].split()) # Ultra-short summary (only 500 chars max) result["summary"] = markdown_result["content"][:500] + "..." if len(markdown_result["content"]) > 500 else markdown_result["content"] # Severely limit table of contents to prevent 1M+ token responses if "table_of_contents" in markdown_result: toc = markdown_result["table_of_contents"] if isinstance(toc, dict): # Keep only essential TOC info, severely truncated result["table_of_contents"] = { "note": toc.get("note", ""), "basic_info": toc.get("basic_info", "")[:200], # Limit to 200 chars } # Add bookmark/heading info if available (limit to first 5 items) if "bookmarks" in toc: result["table_of_contents"]["bookmarks"] = toc["bookmarks"][:5] result["table_of_contents"]["bookmark_count"] = toc.get("bookmark_count", 0) if "available_headings" in toc: result["table_of_contents"]["available_headings"] = toc["available_headings"][:5] result["table_of_contents"]["heading_count"] = toc.get("heading_count", 0) else: result["table_of_contents"] = {"note": "Summary mode - use full processing for detailed TOC"} else: # Full content mode result["markdown"] = markdown_result["content"] result["content_truncated"] = len(markdown_result["content"]) >= 200000 # Warn if near limit # Add images info if "images" in markdown_result: result["images"] = markdown_result["images"] # Add structure info if "structure" in markdown_result: result["structure"] = markdown_result["structure"] # Add table of contents if available if "table_of_contents" in markdown_result: result["table_of_contents"] = markdown_result["table_of_contents"] return result # Helper methods - import from monolithic server async def _analyze_document_size(self, file_path: str, extension: str) -> dict[str, Any]: """Analyze document size for processing recommendations.""" from ..server_monolithic import _analyze_document_size return await _analyze_document_size(file_path, extension) def _get_processing_recommendation(self, doc_analysis: dict[str, Any], page_range: str, summary_only: bool) -> dict[str, Any]: """Get processing recommendations based on document analysis.""" from ..server_monolithic import _get_processing_recommendation return _get_processing_recommendation(doc_analysis, page_range, summary_only) def _parse_page_range(self, page_range: str) -> list[int]: """Parse page range string into list of page numbers.""" from ..server_monolithic import _parse_page_range return _parse_page_range(page_range) async def _convert_docx_to_markdown( self, file_path: str, include_images: bool, image_mode: str, max_image_size: int, preserve_structure: bool, page_numbers: list[int], summary_only: bool, output_dir: str, bookmark_name: str = "", chapter_name: str = "" ) -> dict[str, Any]: """Convert .docx to markdown.""" from ..server_monolithic import _convert_docx_to_markdown return await _convert_docx_to_markdown( file_path, include_images, image_mode, max_image_size, preserve_structure, page_numbers, summary_only, output_dir, bookmark_name, chapter_name ) async def _convert_doc_to_markdown( self, file_path: str, include_images: bool, image_mode: str, max_image_size: int, preserve_structure: bool, page_numbers: list[int], summary_only: bool, output_dir: str ) -> dict[str, Any]: """Convert legacy .doc to markdown.""" from ..server_monolithic import _convert_doc_to_markdown return await _convert_doc_to_markdown( file_path, include_images, image_mode, max_image_size, preserve_structure, page_numbers, summary_only, output_dir ) @mcp_tool( name="extract_word_tables", description="Extract all tables from Word documents with structure, styling, and data conversion options. Returns tables as structured data with CSV/JSON export capability." ) @handle_office_errors("Table extraction") @resolve_field_defaults( include_styling=True, output_format="structured", preserve_merged_cells=True, include_headers=True ) async def extract_word_tables( self, file_path: str = Field(description="Path to Word document or URL"), include_styling: bool = Field(default=True, description="Include table styling information (borders, alignment, etc.)"), output_format: str = Field(default="structured", description="Output format: structured, csv, json, markdown"), preserve_merged_cells: bool = Field(default=True, description="Handle merged cells appropriately"), include_headers: bool = Field(default=True, description="Identify and mark header rows/columns") ) -> dict[str, Any]: """Extract tables from Word documents with comprehensive structure analysis.""" start_time = time.time() import csv import json import io # Resolve and validate file resolved_path = await resolve_office_file_path(file_path) validation = await validate_office_file(resolved_path) if validation["category"] != "word": raise OfficeFileError(f"Table extraction requires Word document, got: {validation['format_name']}") # Import required libraries import docx # Load document doc = docx.Document(resolved_path) tables_data = [] table_index = 0 for table in doc.tables: table_info = { "table_index": table_index, "dimensions": { "rows": len(table.rows), "columns": len(table.columns) if table.rows else 0 }, "data": [], "metadata": {} } # Extract table styling if requested if include_styling: table_info["styling"] = { "table_style": table.style.name if table.style else None, "alignment": str(table.alignment) if hasattr(table, 'alignment') else None } # Extract table data for row_idx, row in enumerate(table.rows): row_data = [] row_styling = [] if include_styling else None for col_idx, cell in enumerate(row.cells): cell_text = cell.text.strip() cell_info = {"text": cell_text} if include_styling: cell_style = { "bold": False, "italic": False, "alignment": None } # Check text formatting in paragraphs for paragraph in cell.paragraphs: for run in paragraph.runs: if run.bold: cell_style["bold"] = True if run.italic: cell_style["italic"] = True if paragraph.alignment is not None: cell_style["alignment"] = str(paragraph.alignment) cell_info["styling"] = cell_style row_styling.append(cell_style) # Handle merged cells if preserve_merged_cells: # Basic merged cell detection (simplified) cell_info["is_merged"] = len(cell.text.strip()) == 0 and col_idx > 0 row_data.append(cell_info) table_info["data"].append({ "row_index": row_idx, "cells": row_data, "styling": row_styling if include_styling else None }) # Identify headers if requested if include_headers and table_info["data"]: # Simple header detection: first row with all non-empty cells first_row_cells = table_info["data"][0]["cells"] if all(cell["text"] for cell in first_row_cells): table_info["metadata"]["has_header_row"] = True table_info["metadata"]["headers"] = [cell["text"] for cell in first_row_cells] else: table_info["metadata"]["has_header_row"] = False # Convert to requested output format if output_format in ["csv", "json", "markdown"]: converted_data = self._convert_table_format(table_info, output_format) table_info["converted_output"] = converted_data tables_data.append(table_info) table_index += 1 # Generate summary total_tables = len(tables_data) total_cells = sum(table["dimensions"]["rows"] * table["dimensions"]["columns"] for table in tables_data) return { "tables": tables_data, "summary": { "total_tables": total_tables, "total_cells": total_cells, "extraction_time": time.time() - start_time, "output_format": output_format, "file_info": validation } } def _convert_table_format(self, table_info: dict, format_type: str) -> str: """Convert table data to specified format.""" rows_data = [] # Extract plain text data for row in table_info["data"]: row_texts = [cell["text"] for cell in row["cells"]] rows_data.append(row_texts) if format_type == "csv": output = io.StringIO() writer = csv.writer(output) writer.writerows(rows_data) return output.getvalue() elif format_type == "json": if table_info["metadata"].get("has_header_row", False): headers = rows_data[0] data_rows = rows_data[1:] json_data = [dict(zip(headers, row)) for row in data_rows] else: json_data = [{"col_" + str(i): cell for i, cell in enumerate(row)} for row in rows_data] return json.dumps(json_data, indent=2) elif format_type == "markdown": if not rows_data: return "" markdown = "" for i, row in enumerate(rows_data): # Escape pipe characters in cell content escaped_row = [cell.replace("|", "\\|") for cell in row] markdown += "| " + " | ".join(escaped_row) + " |\n" # Add separator after header row if i == 0 and table_info["metadata"].get("has_header_row", False): markdown += "| " + " | ".join(["---"] * len(row)) + " |\n" return markdown return "" @mcp_tool( name="analyze_word_structure", description="Analyze Word document structure including headings, sections, page layout, and document hierarchy. Provides navigation map and content organization insights." ) @handle_office_errors("Structure analysis") @resolve_field_defaults( include_page_info=True, extract_outline=True, analyze_styles=True ) async def analyze_word_structure( self, file_path: str = Field(description="Path to Word document or URL"), include_page_info: bool = Field(default=True, description="Include page layout and section information"), extract_outline: bool = Field(default=True, description="Extract document outline and heading hierarchy"), analyze_styles: bool = Field(default=True, description="Analyze custom styles and formatting patterns") ) -> dict[str, Any]: """Analyze Word document structure and organization.""" start_time = time.time() # Resolve and validate file resolved_path = await resolve_office_file_path(file_path) validation = await validate_office_file(resolved_path) if validation["category"] != "word": raise OfficeFileError(f"Structure analysis requires Word document, got: {validation['format_name']}") # Import required libraries import docx from docx.enum.style import WD_STYLE_TYPE # Load document doc = docx.Document(resolved_path) structure_info = { "document_info": { "total_paragraphs": len(doc.paragraphs), "total_tables": len(doc.tables), "total_sections": len(doc.sections) } } # Extract outline and headings if extract_outline: headings = [] heading_styles = ['Heading 1', 'Heading 2', 'Heading 3', 'Heading 4', 'Heading 5', 'Heading 6'] for para_idx, paragraph in enumerate(doc.paragraphs): if paragraph.style.name in heading_styles: level = int(paragraph.style.name.split()[-1]) headings.append({ "text": paragraph.text.strip(), "level": level, "style": paragraph.style.name, "paragraph_index": para_idx }) structure_info["outline"] = { "headings": headings, "heading_count": len(headings), "max_depth": max([h["level"] for h in headings]) if headings else 0 } # Create navigation tree structure_info["navigation_tree"] = self._build_navigation_tree(headings) # Analyze page layout and sections if include_page_info: sections_info = [] for section_idx, section in enumerate(doc.sections): section_info = { "section_index": section_idx, "page_dimensions": {}, "margins": {} } # Safely extract page dimensions try: if section.page_width: section_info["page_dimensions"]["width"] = float(section.page_width.inches) if section.page_height: section_info["page_dimensions"]["height"] = float(section.page_height.inches) except (ValueError, AttributeError, TypeError): section_info["page_dimensions"] = {"width": None, "height": None} # Safely extract margins try: if section.left_margin: section_info["margins"]["left"] = float(section.left_margin.inches) if section.right_margin: section_info["margins"]["right"] = float(section.right_margin.inches) if section.top_margin: section_info["margins"]["top"] = float(section.top_margin.inches) if section.bottom_margin: section_info["margins"]["bottom"] = float(section.bottom_margin.inches) except (ValueError, AttributeError, TypeError): section_info["margins"] = {"left": None, "right": None, "top": None, "bottom": None} # Safely extract orientation try: if hasattr(section, 'orientation') and section.orientation is not None: # orientation is an enum, get its name section_info["orientation"] = section.orientation.name if hasattr(section.orientation, 'name') else str(section.orientation) else: section_info["orientation"] = None except (ValueError, AttributeError, TypeError): section_info["orientation"] = None # Header and footer information try: if section.header: section_info["has_header"] = True section_info["header_text"] = " ".join([p.text for p in section.header.paragraphs]).strip() except (ValueError, AttributeError, TypeError): section_info["has_header"] = False try: if section.footer: section_info["has_footer"] = True section_info["footer_text"] = " ".join([p.text for p in section.footer.paragraphs]).strip() except (ValueError, AttributeError, TypeError): section_info["has_footer"] = False sections_info.append(section_info) structure_info["page_layout"] = sections_info # Analyze styles if analyze_styles: styles_info = { "paragraph_styles": [], "character_styles": [], "table_styles": [], "style_usage": {} } # Collect style information for style in doc.styles: style_info = { "name": style.name, "type": str(style.type), "builtin": style.builtin } if style.type == WD_STYLE_TYPE.PARAGRAPH: styles_info["paragraph_styles"].append(style_info) elif style.type == WD_STYLE_TYPE.CHARACTER: styles_info["character_styles"].append(style_info) elif style.type == WD_STYLE_TYPE.TABLE: styles_info["table_styles"].append(style_info) # Analyze style usage style_usage = {} for paragraph in doc.paragraphs: style_name = paragraph.style.name style_usage[style_name] = style_usage.get(style_name, 0) + 1 styles_info["style_usage"] = style_usage structure_info["styles"] = styles_info return { "structure": structure_info, "analysis_time": time.time() - start_time, "file_info": validation } def _build_navigation_tree(self, headings: list) -> list: """Build hierarchical navigation tree from headings.""" if not headings: return [] tree = [] stack = [] # Stack to keep track of parent nodes for heading in headings: node = { "text": heading["text"], "level": heading["level"], "paragraph_index": heading["paragraph_index"], "children": [] } # Find the correct parent level while stack and stack[-1]["level"] >= heading["level"]: stack.pop() if stack: # Add as child to the parent stack[-1]["children"].append(node) else: # Add as root level tree.append(node) stack.append(node) return tree