"""Universal Office Tools Mixin - Format-agnostic tools that work across all Office document types.""" import time from typing import Any from fastmcp.contrib.mcp_mixin import MCPMixin, mcp_tool from pydantic import Field from ..utils import ( OfficeFileError, classify_document_type, detect_format, get_supported_extensions, resolve_office_file_path, validate_office_file, ) from ..resources import resource_store, EmbeddedResource, ResourceStore class UniversalMixin(MCPMixin): """Mixin containing format-agnostic tools that work across Word, Excel, PowerPoint, and CSV files.""" @mcp_tool( name="extract_text", description="Extract text content from Office documents with intelligent method selection. Supports Word (.docx, .doc), Excel (.xlsx, .xls), PowerPoint (.pptx, .ppt), and CSV files. Uses multi-library fallback for maximum compatibility." ) async def extract_text( self, file_path: str = Field(description="Path to Office document or URL"), preserve_formatting: bool = Field(default=False, description="Preserve text formatting and structure"), include_metadata: bool = Field(default=True, description="Include document metadata in output"), method: str = Field(default="auto", description="Extraction method: auto, primary, fallback") ) -> dict[str, Any]: start_time = time.time() try: # Resolve file path (download if URL) local_path = await resolve_office_file_path(file_path) # Validate file validation = await validate_office_file(local_path) if not validation["is_valid"]: raise OfficeFileError(f"Invalid file: {', '.join(validation['errors'])}") # Get format info format_info = await detect_format(local_path) category = format_info["category"] extension = format_info["extension"] # Extract text based on category with fallback text_result = await self._extract_text_by_category(local_path, extension, category, preserve_formatting, method) # Build response result = { "text": text_result["text"], "metadata": { "original_file": file_path, "format": format_info["format_name"], "extraction_method": text_result["method_used"], "extraction_time": round(time.time() - start_time, 3), "methods_tried": text_result.get("methods_tried", [text_result["method_used"]]) } } # Add formatted sections if preserved if preserve_formatting and "formatted_sections" in text_result: result["structure"] = text_result["formatted_sections"] # Add metadata if requested if include_metadata: doc_metadata = await self._extract_basic_metadata(local_path, extension, category) result["document_metadata"] = doc_metadata return result except OfficeFileError: raise except Exception as e: raise OfficeFileError(f"Text extraction failed: {str(e)}") @mcp_tool( name="extract_images", description="Extract images from Office documents with size filtering and format conversion." ) async def extract_images( self, file_path: str = Field(description="Path to Office document or URL"), min_width: int = Field(default=100, description="Minimum image width in pixels"), min_height: int = Field(default=100, description="Minimum image height in pixels"), output_format: str = Field(default="png", description="Output image format: png, jpg, jpeg"), include_metadata: bool = Field(default=True, description="Include image metadata") ) -> dict[str, Any]: start_time = time.time() try: # Resolve file path local_path = await resolve_office_file_path(file_path) # Validate file validation = await validate_office_file(local_path) if not validation["is_valid"]: raise OfficeFileError(f"Invalid file: {', '.join(validation['errors'])}") # Get format info format_info = await detect_format(local_path) category = format_info["category"] extension = format_info["extension"] # Extract images based on category images = await self._extract_images_by_category(local_path, extension, category, output_format, min_width, min_height) return { "images": images, "metadata": { "original_file": file_path, "format": format_info["format_name"], "image_count": len(images), "extraction_time": round(time.time() - start_time, 3), "filters_applied": { "min_width": min_width, "min_height": min_height, "output_format": output_format } } } except OfficeFileError: raise except Exception as e: raise OfficeFileError(f"Image extraction failed: {str(e)}") @mcp_tool( name="extract_metadata", description="Extract comprehensive metadata from Office documents." ) async def extract_metadata( self, file_path: str = Field(description="Path to Office document or URL") ) -> dict[str, Any]: start_time = time.time() try: # Resolve file path local_path = await resolve_office_file_path(file_path) # Validate file validation = await validate_office_file(local_path) if not validation["is_valid"]: raise OfficeFileError(f"Invalid file: {', '.join(validation['errors'])}") # Get format info format_info = await detect_format(local_path) category = format_info["category"] extension = format_info["extension"] # Extract metadata based on category metadata = await self._extract_metadata_by_category(local_path, extension, category) # Add extraction info metadata["extraction_info"] = { "extraction_time": round(time.time() - start_time, 3), "format_detected": format_info["format_name"] } return metadata except OfficeFileError: raise except Exception as e: raise OfficeFileError(f"Metadata extraction failed: {str(e)}") @mcp_tool( name="detect_office_format", description="Intelligent Office document format detection and analysis." ) async def detect_office_format( self, file_path: str = Field(description="Path to Office document or URL") ) -> dict[str, Any]: try: # Resolve file path local_path = await resolve_office_file_path(file_path) # Get comprehensive format detection format_info = await detect_format(local_path) # Add classification classification = await classify_document_type(local_path) format_info.update(classification) return format_info except Exception as e: raise OfficeFileError(f"Format detection failed: {str(e)}") @mcp_tool( name="analyze_document_health", description="Comprehensive document health and integrity analysis." ) async def analyze_document_health( self, file_path: str = Field(description="Path to Office document or URL") ) -> dict[str, Any]: start_time = time.time() try: # Resolve file path local_path = await resolve_office_file_path(file_path) # Validate file thoroughly validation = await validate_office_file(local_path) # Get format detection format_info = await detect_format(local_path) # Build health report health_report = { "overall_health": "healthy" if validation["is_valid"] else "unhealthy", "validation": validation, "format_info": format_info, "analysis_time": round(time.time() - start_time, 3) } # Add recommendations if not validation["is_valid"]: health_report["recommendations"] = [ "File validation failed - check for corruption", "Try opening file in native application", "Consider file recovery tools if data is critical" ] else: health_report["recommendations"] = [ "File appears healthy and readable", "All validation checks passed" ] return health_report except Exception as e: return { "overall_health": "error", "error": str(e), "analysis_time": round(time.time() - start_time, 3), "recommendations": [ "File could not be analyzed", "Check file path and permissions", "Verify file is not corrupted" ] } @mcp_tool( name="get_supported_formats", description="Get list of all supported Office document formats and their capabilities." ) async def get_supported_formats(self) -> dict[str, Any]: extensions = get_supported_extensions() format_details = {} for ext in extensions: if ext.startswith('.doc'): category = "word" legacy = ext == ".doc" elif ext.startswith('.xls') or ext == '.csv': category = "excel" legacy = ext == ".xls" elif ext.startswith('.ppt'): category = "powerpoint" legacy = ext == ".ppt" else: category = "other" legacy = False format_details[ext] = { "category": category, "legacy_format": legacy, "text_extraction": True, "image_extraction": ext != ".csv", "metadata_extraction": True, "markdown_conversion": category == "word" } return { "supported_extensions": extensions, "format_details": format_details, "categories": { "word": [ext for ext, info in format_details.items() if info["category"] == "word"], "excel": [ext for ext, info in format_details.items() if info["category"] == "excel"], "powerpoint": [ext for ext, info in format_details.items() if info["category"] == "powerpoint"] }, "total_formats": len(extensions) } # Helper methods - these will be imported from the original server.py async def _extract_text_by_category(self, file_path: str, extension: str, category: str, preserve_formatting: bool, method: str) -> dict[str, Any]: """Extract text based on document category.""" # Import the appropriate extraction function from ..utils import _extract_word_text, _extract_excel_text, _extract_powerpoint_text if category == "word": return await _extract_word_text(file_path, extension, preserve_formatting, method) elif category == "excel": return await _extract_excel_text(file_path, extension, preserve_formatting, method) elif category == "powerpoint": return await _extract_powerpoint_text(file_path, extension, preserve_formatting, method) else: raise OfficeFileError(f"Unsupported document category: {category}") async def _extract_images_by_category(self, file_path: str, extension: str, category: str, output_format: str, min_width: int, min_height: int) -> list[dict[str, Any]]: """Extract images based on document category.""" from ..utils import _extract_word_images, _extract_excel_images, _extract_powerpoint_images if category == "word": return await _extract_word_images(file_path, extension, output_format, min_width, min_height) elif category == "excel": return await _extract_excel_images(file_path, extension, output_format, min_width, min_height) elif category == "powerpoint": return await _extract_powerpoint_images(file_path, extension, output_format, min_width, min_height) else: return [] # CSV and other formats don't support images async def _extract_metadata_by_category(self, file_path: str, extension: str, category: str) -> dict[str, Any]: """Extract metadata based on document category.""" from ..utils import _extract_word_metadata, _extract_excel_metadata, _extract_powerpoint_metadata, _extract_basic_metadata # Get basic metadata first metadata = await _extract_basic_metadata(file_path, extension, category) # Add category-specific metadata if category == "word": specific_metadata = await _extract_word_metadata(file_path, extension) elif category == "excel": specific_metadata = await _extract_excel_metadata(file_path, extension) elif category == "powerpoint": specific_metadata = await _extract_powerpoint_metadata(file_path, extension) else: specific_metadata = {} metadata.update(specific_metadata) return metadata async def _extract_basic_metadata(self, file_path: str, extension: str, category: str) -> dict[str, Any]: """Extract basic metadata common to all documents.""" from ..utils import _extract_basic_metadata return await _extract_basic_metadata(file_path, extension, category) @mcp_tool( name="index_document", description="Scan and index all resources in a document (images, chapters, sheets, slides). Returns resource URIs that can be fetched individually. Use this before accessing resources via their URIs." ) async def index_document( self, file_path: str = Field(description="Path to Office document or URL"), include_images: bool = Field(default=True, description="Index embedded images"), include_chapters: bool = Field(default=True, description="Index chapters/sections (Word docs)"), include_sheets: bool = Field(default=True, description="Index sheets (Excel docs)"), include_slides: bool = Field(default=True, description="Index slides (PowerPoint docs)"), text_patterns_only: bool = Field(default=False, description="Ignore heading styles, detect chapters by 'Chapter X' text patterns only") ) -> dict[str, Any]: """Scan document and populate resource store with available content. Returns URIs for all indexed resources that can be fetched via MCP resources. """ start_time = time.time() # Resolve and validate local_path = await resolve_office_file_path(file_path) validation = await validate_office_file(local_path) if not validation["is_valid"]: raise OfficeFileError(f"Invalid file: {', '.join(validation['errors'])}") format_info = await detect_format(local_path) category = format_info["category"] extension = format_info["extension"] # Generate stable document ID doc_id = ResourceStore.get_doc_id(local_path) # Clear any existing resources for this doc resource_store.clear_document(doc_id) indexed = { "doc_id": doc_id, "file": file_path, "format": format_info["format_name"], "resources": {} } # Index images if include_images: try: images = await self._extract_images_by_category( local_path, extension, category, "png", 50, 50 ) for idx, img in enumerate(images): resource = EmbeddedResource( resource_id=str(idx), resource_type="image", mime_type=img.get("mime_type", "image/png"), data=img.get("data", b""), name=img.get("filename"), metadata={ "width": img.get("width"), "height": img.get("height"), "format": img.get("format", "png") } ) resource_store.store(doc_id, resource, local_path) indexed["resources"]["image"] = [ {"id": str(i), "uri": f"image://{doc_id}/{i}"} for i in range(len(images)) ] except Exception as e: indexed["resources"]["image"] = {"error": str(e)} # Index chapters (Word documents) if include_chapters and category == "word": try: chapters = await self._index_word_chapters(local_path, doc_id, text_patterns_only) indexed["resources"]["chapter"] = chapters except Exception as e: indexed["resources"]["chapter"] = {"error": str(e)} # Index sheets (Excel documents) if include_sheets and category == "excel": try: sheets = await self._index_excel_sheets(local_path, doc_id) indexed["resources"]["sheet"] = sheets except Exception as e: indexed["resources"]["sheet"] = {"error": str(e)} # Index slides (PowerPoint documents) if include_slides and category == "powerpoint": try: slides = await self._index_powerpoint_slides(local_path, doc_id) indexed["resources"]["slide"] = slides except Exception as e: indexed["resources"]["slide"] = {"error": str(e)} indexed["indexing_time"] = round(time.time() - start_time, 3) indexed["total_resources"] = sum( len(v) if isinstance(v, list) else 0 for v in indexed["resources"].values() ) return indexed async def _index_word_chapters(self, file_path: str, doc_id: str, text_patterns_only: bool = False) -> list[dict]: """Extract and index chapters/sections from a Word document. Detection strategy (in order): 1. Primary: Heading 1 styles (structured, reliable) → section://doc/N 2. Fallback: "Chapter X" text pattern (books, manuscripts) → chapter://doc/N If text_patterns_only=True, skips heading styles and uses only text patterns. """ import re from docx import Document doc = Document(file_path) chapters = [] current_section = None current_paragraphs = [] section_num = 0 # Detection patterns chapter_pattern = re.compile(r'^chapter\s*(\d+)', re.IGNORECASE) heading_styles = {'Heading 1', 'Heading1', 'Title', 'Titre', 'Überschrift 1'} def is_heading(para) -> bool: """Check if paragraph is a heading style.""" style_name = para.style.name if para.style else '' return style_name in heading_styles or style_name.startswith('Heading 1') def save_section(resource_type: str = "chapter"): nonlocal current_section, current_paragraphs, section_num if current_section is not None and current_paragraphs: # Convert to markdown markdown_lines = [] markdown_lines.append(f"# {current_section['title']}\n") for para in current_paragraphs: text = para.strip() if text: markdown_lines.append(text + "\n") content = "\n".join(markdown_lines) resource = EmbeddedResource( resource_id=str(current_section["number"]), resource_type=resource_type, mime_type="text/markdown", data=content, name=current_section["title"], metadata={ "word_count": len(content.split()), "paragraph_count": len(current_paragraphs) } ) resource_store.store(doc_id, resource, file_path) chapters.append({ "id": str(current_section["number"]), "title": current_section["title"], "uri": f"{resource_type}://{doc_id}/{current_section['number']}", "word_count": len(content.split()) }) # Primary: detect by Heading 1 styles (structured, reliable) # Skip if text_patterns_only=True (for messy docs with inconsistent styles) if not text_patterns_only: for para in doc.paragraphs: text = para.text.strip() if is_heading(para) and text: save_section("section") section_num += 1 current_section = { "number": section_num, "title": text[:100] } current_paragraphs = [] elif current_section is not None: current_paragraphs.append(text) save_section("section") # Fallback: try "Chapter X" text pattern (for docs without heading styles) if not chapters: current_section = None current_paragraphs = [] for para in doc.paragraphs: text = para.text.strip() match = chapter_pattern.match(text) if match: save_section("chapter") current_section = { "number": int(match.group(1)), "title": text[:100] } current_paragraphs = [] elif current_section is not None: current_paragraphs.append(text) save_section("chapter") return chapters async def _index_excel_sheets(self, file_path: str, doc_id: str) -> list[dict]: """Extract and index sheets from an Excel document.""" import openpyxl wb = openpyxl.load_workbook(file_path, data_only=True) sheets = [] for sheet_name in wb.sheetnames: ws = wb[sheet_name] # Convert to markdown table rows = [] for row in ws.iter_rows(values_only=True): row_data = [str(cell) if cell is not None else "" for cell in row] if any(row_data): # Skip empty rows rows.append(row_data) if not rows: continue # Build markdown table md_lines = [] md_lines.append("| " + " | ".join(rows[0]) + " |") md_lines.append("| " + " | ".join(["---"] * len(rows[0])) + " |") for row in rows[1:]: # Pad row if needed while len(row) < len(rows[0]): row.append("") md_lines.append("| " + " | ".join(row[:len(rows[0])]) + " |") content = "\n".join(md_lines) resource = EmbeddedResource( resource_id=sheet_name, resource_type="sheet", mime_type="text/markdown", data=content, name=sheet_name, metadata={ "rows": len(rows), "columns": len(rows[0]) if rows else 0 } ) resource_store.store(doc_id, resource, file_path) sheets.append({ "id": sheet_name, "name": sheet_name, "uri": f"sheet://{doc_id}/{sheet_name}", "rows": len(rows), "columns": len(rows[0]) if rows else 0 }) wb.close() return sheets async def _index_powerpoint_slides(self, file_path: str, doc_id: str) -> list[dict]: """Extract and index slides from a PowerPoint document.""" from pptx import Presentation prs = Presentation(file_path) slides = [] for idx, slide in enumerate(prs.slides): slide_num = idx + 1 # Extract text from shapes text_parts = [] title = None for shape in slide.shapes: if hasattr(shape, "text") and shape.text.strip(): if shape.is_placeholder and hasattr(shape, "placeholder_format"): if shape.placeholder_format.type == 1: # Title title = shape.text.strip() text_parts.append(shape.text.strip()) if not text_parts: continue # Build markdown md_lines = [] if title: md_lines.append(f"# Slide {slide_num}: {title}\n") else: md_lines.append(f"# Slide {slide_num}\n") for text in text_parts: if text != title: md_lines.append(text + "\n") content = "\n".join(md_lines) resource = EmbeddedResource( resource_id=str(slide_num), resource_type="slide", mime_type="text/markdown", data=content, name=title or f"Slide {slide_num}", metadata={ "slide_number": slide_num, "has_title": title is not None } ) resource_store.store(doc_id, resource, file_path) slides.append({ "id": str(slide_num), "title": title or f"Slide {slide_num}", "uri": f"slide://{doc_id}/{slide_num}" }) return slides