From b3caed78d393736ba1d99fa18912594b6d59a786 Mon Sep 17 00:00:00 2001 From: Ryan Malloy Date: Mon, 18 Aug 2025 23:23:59 -0600 Subject: [PATCH] =?UTF-8?q?=E2=9C=A8=20Add=20comprehensive=20Markdown=20co?= =?UTF-8?q?nversion=20with=20image=20support?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add convert_to_markdown tool for .docx/.doc files - Support multiple image handling modes (base64, files, references) - Implement large document chunking for performance - Preserve document structure (headings, lists, tables) - Smart fallback methods (mammoth → python-docx → custom) - Handle both modern and legacy Word formats --- src/mcp_office_tools/server.py | 905 ++++++++++++++++++++++++++------- 1 file changed, 724 insertions(+), 181 deletions(-) diff --git a/src/mcp_office_tools/server.py b/src/mcp_office_tools/server.py index cdb56ce..a293afd 100644 --- a/src/mcp_office_tools/server.py +++ b/src/mcp_office_tools/server.py @@ -4,23 +4,22 @@ FastMCP server providing 30+ tools for processing Word, Excel, PowerPoint docume including both modern formats (.docx, .xlsx, .pptx) and legacy formats (.doc, .xls, .ppt). """ -import time -import tempfile import os -from typing import Dict, Any, List, Optional, Union +import tempfile +import time from pathlib import Path +from typing import Any from fastmcp import FastMCP from pydantic import Field from .utils import ( OfficeFileError, - validate_office_file, - validate_office_path, - detect_format, classify_document_type, + detect_format, + get_supported_extensions, resolve_office_file_path, - get_supported_extensions + validate_office_file, ) # Initialize FastMCP app @@ -37,28 +36,28 @@ async def extract_text( preserve_formatting: bool = Field(default=False, description="Preserve text formatting and structure"), include_metadata: bool = Field(default=True, description="Include document metadata in output"), method: str = Field(default="auto", description="Extraction method: auto, primary, fallback") -) -> Dict[str, Any]: +) -> dict[str, Any]: """Extract text content from Office documents with intelligent method selection. Supports Word (.docx, .doc), Excel (.xlsx, .xls), PowerPoint (.pptx, .ppt), and CSV files. Uses multi-library fallback for maximum compatibility. """ start_time = time.time() - + try: # Resolve file path (download if URL) local_path = await resolve_office_file_path(file_path) - + # Validate file validation = await validate_office_file(local_path) if not validation["is_valid"]: raise OfficeFileError(f"Invalid file: {', '.join(validation['errors'])}") - + # Get format info format_info = await detect_format(local_path) category = format_info["category"] extension = format_info["extension"] - + # Route to appropriate extraction method if category == "word": text_result = await _extract_word_text(local_path, extension, preserve_formatting, method) @@ -68,7 +67,7 @@ async def extract_text( text_result = await _extract_powerpoint_text(local_path, extension, preserve_formatting, method) else: raise OfficeFileError(f"Unsupported document category: {category}") - + # Compile results result = { "text": text_result["text"], @@ -82,15 +81,15 @@ async def extract_text( "is_legacy": format_info["is_legacy"] } } - + if include_metadata: result["metadata"] = await _extract_basic_metadata(local_path, extension, category) - + if preserve_formatting: result["formatted_sections"] = text_result.get("formatted_sections", []) - + return result - + except Exception as e: if DEBUG: import traceback @@ -105,24 +104,24 @@ async def extract_images( min_width: int = Field(default=100, description="Minimum image width in pixels"), min_height: int = Field(default=100, description="Minimum image height in pixels"), include_metadata: bool = Field(default=True, description="Include image metadata") -) -> Dict[str, Any]: +) -> dict[str, Any]: """Extract images from Office documents with size filtering and format conversion.""" start_time = time.time() - + try: # Resolve file path local_path = await resolve_office_file_path(file_path) - + # Validate file validation = await validate_office_file(local_path) if not validation["is_valid"]: raise OfficeFileError(f"Invalid file: {', '.join(validation['errors'])}") - + # Get format info format_info = await detect_format(local_path) category = format_info["category"] extension = format_info["extension"] - + # Extract images based on format if category == "word": images = await _extract_word_images(local_path, extension, output_format, min_width, min_height) @@ -132,7 +131,7 @@ async def extract_images( images = await _extract_powerpoint_images(local_path, extension, output_format, min_width, min_height) else: raise OfficeFileError(f"Image extraction not supported for category: {category}") - + result = { "images": images, "image_count": len(images), @@ -142,12 +141,12 @@ async def extract_images( "category": category } } - + if include_metadata: result["total_size_bytes"] = sum(img.get("size_bytes", 0) for img in images) - + return result - + except Exception as e: if DEBUG: import traceback @@ -158,24 +157,24 @@ async def extract_images( @app.tool() async def extract_metadata( file_path: str = Field(description="Path to Office document or URL") -) -> Dict[str, Any]: +) -> dict[str, Any]: """Extract comprehensive metadata from Office documents.""" start_time = time.time() - + try: # Resolve file path local_path = await resolve_office_file_path(file_path) - + # Validate file validation = await validate_office_file(local_path) if not validation["is_valid"]: raise OfficeFileError(f"Invalid file: {', '.join(validation['errors'])}") - + # Get format info format_info = await detect_format(local_path) category = format_info["category"] extension = format_info["extension"] - + # Extract metadata based on format if category == "word": metadata = await _extract_word_metadata(local_path, extension) @@ -185,11 +184,11 @@ async def extract_metadata( metadata = await _extract_powerpoint_metadata(local_path, extension) else: metadata = {"category": category, "basic_info": "Limited metadata available"} - + # Add file system metadata path = Path(local_path) stat = path.stat() - + result = { "document_metadata": metadata, "file_metadata": { @@ -202,9 +201,9 @@ async def extract_metadata( "format_info": format_info, "extraction_time": round(time.time() - start_time, 3) } - + return result - + except Exception as e: if DEBUG: import traceback @@ -215,20 +214,20 @@ async def extract_metadata( @app.tool() async def detect_office_format( file_path: str = Field(description="Path to Office document or URL") -) -> Dict[str, Any]: +) -> dict[str, Any]: """Intelligent Office document format detection and analysis.""" start_time = time.time() - + try: # Resolve file path local_path = await resolve_office_file_path(file_path) - + # Detect format format_info = await detect_format(local_path) - + # Classify document classification = await classify_document_type(local_path) - + result = { "format_detection": format_info, "document_classification": classification, @@ -236,9 +235,9 @@ async def detect_office_format( "processing_recommendations": format_info.get("processing_hints", []), "detection_time": round(time.time() - start_time, 3) } - + return result - + except Exception as e: if DEBUG: import traceback @@ -249,25 +248,25 @@ async def detect_office_format( @app.tool() async def analyze_document_health( file_path: str = Field(description="Path to Office document or URL") -) -> Dict[str, Any]: +) -> dict[str, Any]: """Comprehensive document health and integrity analysis.""" start_time = time.time() - + try: # Resolve file path local_path = await resolve_office_file_path(file_path) - + # Validate file thoroughly validation = await validate_office_file(local_path) - + # Get format info format_info = await detect_format(local_path) - + # Health assessment health_score = _calculate_health_score(validation, format_info) - + result = { - "overall_health": "healthy" if validation["is_valid"] and health_score >= 8 else + "overall_health": "healthy" if validation["is_valid"] and health_score >= 8 else "warning" if health_score >= 5 else "problematic", "health_score": health_score, "validation_results": validation, @@ -275,9 +274,9 @@ async def analyze_document_health( "recommendations": _get_health_recommendations(validation, format_info), "analysis_time": round(time.time() - start_time, 3) } - + return result - + except Exception as e: if DEBUG: import traceback @@ -286,10 +285,96 @@ async def analyze_document_health( @app.tool() -async def get_supported_formats() -> Dict[str, Any]: +async def convert_to_markdown( + file_path: str = Field(description="Path to Office document or URL"), + include_images: bool = Field(default=True, description="Include images in markdown with base64 encoding or file references"), + image_mode: str = Field(default="base64", description="Image handling mode: 'base64', 'files', or 'references'"), + max_image_size: int = Field(default=1024*1024, description="Maximum image size in bytes for base64 encoding"), + preserve_structure: bool = Field(default=True, description="Preserve document structure (headings, lists, tables)"), + chunk_size: int = Field(default=0, description="Split large documents into chunks (0 = no chunking)"), + output_dir: str = Field(default="", description="Output directory for image files (if image_mode='files')") +) -> dict[str, Any]: + """Convert Office documents to Markdown format with image support and structure preservation. + + Handles large .docx files efficiently with options for image embedding, file extraction, + and document chunking for very large files. + """ + start_time = time.time() + + try: + # Resolve file path + local_path = await resolve_office_file_path(file_path) + + # Validate file + validation = await validate_office_file(local_path) + if not validation["is_valid"]: + raise OfficeFileError(f"Invalid file: {', '.join(validation['errors'])}") + + # Get format info + format_info = await detect_format(local_path) + category = format_info["category"] + extension = format_info["extension"] + + # Currently focused on Word documents for markdown conversion + if category != "word": + raise OfficeFileError(f"Markdown conversion currently only supports Word documents, got: {category}") + + # Convert to markdown based on format + if extension == ".docx": + markdown_result = await _convert_docx_to_markdown( + local_path, include_images, image_mode, max_image_size, + preserve_structure, chunk_size, output_dir + ) + else: # .doc + # For legacy .doc files, use mammoth if available + markdown_result = await _convert_doc_to_markdown( + local_path, include_images, image_mode, max_image_size, + preserve_structure, chunk_size, output_dir + ) + + result = { + "markdown": markdown_result["content"], + "metadata": { + "original_file": os.path.basename(local_path), + "format": format_info["format_name"], + "conversion_method": markdown_result["method_used"], + "character_count": len(markdown_result["content"]), + "word_count": len(markdown_result["content"].split()), + "conversion_time": round(time.time() - start_time, 3) + } + } + + # Add chunking info if applicable + if chunk_size > 0 and markdown_result.get("chunks"): + result["chunks"] = markdown_result["chunks"] + result["metadata"]["chunk_count"] = len(markdown_result["chunks"]) + + # Add image info + if include_images and markdown_result.get("images"): + result["images"] = markdown_result["images"] + result["metadata"]["image_count"] = len(markdown_result["images"]) + result["metadata"]["total_image_size"] = sum( + img.get("size_bytes", 0) for img in markdown_result["images"] + ) + + # Add structure info + if preserve_structure and markdown_result.get("structure"): + result["structure"] = markdown_result["structure"] + + return result + + except Exception as e: + if DEBUG: + import traceback + traceback.print_exc() + raise OfficeFileError(f"Markdown conversion failed: {str(e)}") + + +@app.tool() +async def get_supported_formats() -> dict[str, Any]: """Get list of all supported Office document formats and their capabilities.""" extensions = get_supported_extensions() - + format_details = {} for ext in extensions: from .utils.validation import get_format_info @@ -300,7 +385,7 @@ async def get_supported_formats() -> Dict[str, Any]: "category": info["category"], "mime_types": info["mime_types"] } - + return { "supported_extensions": extensions, "format_details": format_details, @@ -314,10 +399,10 @@ async def get_supported_formats() -> Dict[str, Any]: # Helper functions for text extraction -async def _extract_word_text(file_path: str, extension: str, preserve_formatting: bool, method: str) -> Dict[str, Any]: +async def _extract_word_text(file_path: str, extension: str, preserve_formatting: bool, method: str) -> dict[str, Any]: """Extract text from Word documents with fallback methods.""" methods_tried = [] - + # Method selection if method == "auto": if extension == ".docx": @@ -328,19 +413,19 @@ async def _extract_word_text(file_path: str, extension: str, preserve_formatting method_order = ["python-docx"] if extension == ".docx" else ["olefile"] else: # fallback method_order = ["mammoth", "docx2txt"] - + text = "" formatted_sections = [] method_used = None - + for method_name in method_order: try: methods_tried.append(method_name) - + if method_name == "python-docx" and extension == ".docx": import docx doc = docx.Document(file_path) - + paragraphs = [] for para in doc.paragraphs: paragraphs.append(para.text) @@ -350,14 +435,14 @@ async def _extract_word_text(file_path: str, extension: str, preserve_formatting "text": para.text, "style": para.style.name if para.style else None }) - + text = "\n".join(paragraphs) method_used = "python-docx" break - + elif method_name == "mammoth": import mammoth - + with open(file_path, "rb") as docx_file: if preserve_formatting: result = mammoth.convert_to_html(docx_file) @@ -369,16 +454,16 @@ async def _extract_word_text(file_path: str, extension: str, preserve_formatting else: result = mammoth.extract_raw_text(docx_file) text = result.value - + method_used = "mammoth" break - + elif method_name == "docx2txt": import docx2txt text = docx2txt.process(file_path) method_used = "docx2txt" break - + elif method_name == "olefile" and extension == ".doc": # Basic text extraction for legacy .doc files try: @@ -397,15 +482,15 @@ async def _extract_word_text(file_path: str, extension: str, preserve_formatting break except Exception: continue - + except ImportError: continue except Exception: continue - + if not method_used: raise OfficeFileError(f"Failed to extract text using methods: {', '.join(methods_tried)}") - + return { "text": text, "method_used": method_used, @@ -414,10 +499,10 @@ async def _extract_word_text(file_path: str, extension: str, preserve_formatting } -async def _extract_excel_text(file_path: str, extension: str, preserve_formatting: bool, method: str) -> Dict[str, Any]: +async def _extract_excel_text(file_path: str, extension: str, preserve_formatting: bool, method: str) -> dict[str, Any]: """Extract text from Excel documents.""" methods_tried = [] - + if extension == ".csv": # CSV handling import pandas as pd @@ -432,92 +517,92 @@ async def _extract_excel_text(file_path: str, extension: str, preserve_formattin } except Exception as e: raise OfficeFileError(f"CSV processing failed: {str(e)}") - + # Excel file handling text = "" formatted_sections = [] method_used = None - + method_order = ["openpyxl", "pandas", "xlrd"] if extension == ".xlsx" else ["xlrd", "pandas", "openpyxl"] - + for method_name in method_order: try: methods_tried.append(method_name) - + if method_name == "openpyxl" and extension in [".xlsx", ".xlsm"]: import openpyxl wb = openpyxl.load_workbook(file_path, data_only=True) - + text_parts = [] for sheet_name in wb.sheetnames: ws = wb[sheet_name] text_parts.append(f"Sheet: {sheet_name}") - + for row in ws.iter_rows(values_only=True): row_text = "\t".join(str(cell) if cell is not None else "" for cell in row) if row_text.strip(): text_parts.append(row_text) - + if preserve_formatting: formatted_sections.append({ "type": "worksheet", "name": sheet_name, "data": [[str(cell.value) if cell.value is not None else "" for cell in row] for row in ws.iter_rows()] }) - + text = "\n".join(text_parts) method_used = "openpyxl" break - + elif method_name == "pandas": import pandas as pd - + if extension in [".xlsx", ".xlsm"]: dfs = pd.read_excel(file_path, sheet_name=None) else: # .xls dfs = pd.read_excel(file_path, sheet_name=None, engine='xlrd') - + text_parts = [] for sheet_name, df in dfs.items(): text_parts.append(f"Sheet: {sheet_name}") text_parts.append(df.to_string()) - + if preserve_formatting: formatted_sections.append({ "type": "dataframe", "name": sheet_name, "data": df.to_dict() }) - + text = "\n\n".join(text_parts) method_used = "pandas" break - + elif method_name == "xlrd" and extension == ".xls": import xlrd wb = xlrd.open_workbook(file_path) - + text_parts = [] for sheet in wb.sheets(): text_parts.append(f"Sheet: {sheet.name}") - + for row_idx in range(sheet.nrows): row = sheet.row_values(row_idx) row_text = "\t".join(str(cell) for cell in row) text_parts.append(row_text) - + text = "\n".join(text_parts) method_used = "xlrd" break - + except ImportError: continue except Exception: continue - + if not method_used: raise OfficeFileError(f"Failed to extract text using methods: {', '.join(methods_tried)}") - + return { "text": text, "method_used": method_used, @@ -526,28 +611,28 @@ async def _extract_excel_text(file_path: str, extension: str, preserve_formattin } -async def _extract_powerpoint_text(file_path: str, extension: str, preserve_formatting: bool, method: str) -> Dict[str, Any]: +async def _extract_powerpoint_text(file_path: str, extension: str, preserve_formatting: bool, method: str) -> dict[str, Any]: """Extract text from PowerPoint documents.""" methods_tried = [] - + if extension == ".pptx": try: import pptx prs = pptx.Presentation(file_path) - + text_parts = [] formatted_sections = [] - + for slide_num, slide in enumerate(prs.slides, 1): slide_text_parts = [] - + for shape in slide.shapes: if hasattr(shape, "text") and shape.text: slide_text_parts.append(shape.text) - + slide_text = "\n".join(slide_text_parts) text_parts.append(f"Slide {slide_num}:\n{slide_text}") - + if preserve_formatting: formatted_sections.append({ "type": "slide", @@ -555,54 +640,55 @@ async def _extract_powerpoint_text(file_path: str, extension: str, preserve_form "text": slide_text, "shapes": len(slide.shapes) }) - + text = "\n\n".join(text_parts) - + return { "text": text, "method_used": "python-pptx", "methods_tried": ["python-pptx"], "formatted_sections": formatted_sections } - + except ImportError: methods_tried.append("python-pptx") - except Exception as e: + except Exception: methods_tried.append("python-pptx") - + # Legacy .ppt handling would require additional libraries if extension == ".ppt": raise OfficeFileError("Legacy PowerPoint (.ppt) text extraction requires additional setup") - + raise OfficeFileError(f"Failed to extract text using methods: {', '.join(methods_tried)}") # Helper functions for image extraction -async def _extract_word_images(file_path: str, extension: str, output_format: str, min_width: int, min_height: int) -> List[Dict[str, Any]]: +async def _extract_word_images(file_path: str, extension: str, output_format: str, min_width: int, min_height: int) -> list[dict[str, Any]]: """Extract images from Word documents.""" images = [] - + if extension == ".docx": try: - import zipfile - from PIL import Image import io - + import zipfile + + from PIL import Image + with zipfile.ZipFile(file_path, 'r') as zip_file: # Look for images in media folder image_files = [f for f in zip_file.namelist() if f.startswith('word/media/')] - + for i, img_path in enumerate(image_files): try: img_data = zip_file.read(img_path) img = Image.open(io.BytesIO(img_data)) - + # Size filtering if img.width >= min_width and img.height >= min_height: # Save to temp file temp_path = os.path.join(TEMP_DIR, f"word_image_{i}.{output_format}") img.save(temp_path, format=output_format.upper()) - + images.append({ "index": i, "filename": os.path.basename(img_path), @@ -614,38 +700,39 @@ async def _extract_word_images(file_path: str, extension: str, output_format: st }) except Exception: continue - + except Exception as e: raise OfficeFileError(f"Word image extraction failed: {str(e)}") - + return images -async def _extract_excel_images(file_path: str, extension: str, output_format: str, min_width: int, min_height: int) -> List[Dict[str, Any]]: +async def _extract_excel_images(file_path: str, extension: str, output_format: str, min_width: int, min_height: int) -> list[dict[str, Any]]: """Extract images from Excel documents.""" images = [] - + if extension in [".xlsx", ".xlsm"]: try: - import zipfile - from PIL import Image import io - + import zipfile + + from PIL import Image + with zipfile.ZipFile(file_path, 'r') as zip_file: # Look for images in media folder image_files = [f for f in zip_file.namelist() if f.startswith('xl/media/')] - + for i, img_path in enumerate(image_files): try: img_data = zip_file.read(img_path) img = Image.open(io.BytesIO(img_data)) - + # Size filtering if img.width >= min_width and img.height >= min_height: # Save to temp file temp_path = os.path.join(TEMP_DIR, f"excel_image_{i}.{output_format}") img.save(temp_path, format=output_format.upper()) - + images.append({ "index": i, "filename": os.path.basename(img_path), @@ -657,38 +744,39 @@ async def _extract_excel_images(file_path: str, extension: str, output_format: s }) except Exception: continue - + except Exception as e: raise OfficeFileError(f"Excel image extraction failed: {str(e)}") - + return images -async def _extract_powerpoint_images(file_path: str, extension: str, output_format: str, min_width: int, min_height: int) -> List[Dict[str, Any]]: +async def _extract_powerpoint_images(file_path: str, extension: str, output_format: str, min_width: int, min_height: int) -> list[dict[str, Any]]: """Extract images from PowerPoint documents.""" images = [] - + if extension == ".pptx": try: - import zipfile - from PIL import Image import io - + import zipfile + + from PIL import Image + with zipfile.ZipFile(file_path, 'r') as zip_file: # Look for images in media folder image_files = [f for f in zip_file.namelist() if f.startswith('ppt/media/')] - + for i, img_path in enumerate(image_files): try: img_data = zip_file.read(img_path) img = Image.open(io.BytesIO(img_data)) - + # Size filtering if img.width >= min_width and img.height >= min_height: # Save to temp file temp_path = os.path.join(TEMP_DIR, f"powerpoint_image_{i}.{output_format}") img.save(temp_path, format=output_format.upper()) - + images.append({ "index": i, "filename": os.path.basename(img_path), @@ -700,48 +788,48 @@ async def _extract_powerpoint_images(file_path: str, extension: str, output_form }) except Exception: continue - + except Exception as e: raise OfficeFileError(f"PowerPoint image extraction failed: {str(e)}") - + return images # Helper functions for metadata extraction -async def _extract_basic_metadata(file_path: str, extension: str, category: str) -> Dict[str, Any]: +async def _extract_basic_metadata(file_path: str, extension: str, category: str) -> dict[str, Any]: """Extract basic metadata from Office documents.""" metadata = {"category": category, "extension": extension} - + try: if extension in [".docx", ".xlsx", ".pptx"] and category in ["word", "excel", "powerpoint"]: import zipfile - + with zipfile.ZipFile(file_path, 'r') as zip_file: # Core properties if 'docProps/core.xml' in zip_file.namelist(): - core_xml = zip_file.read('docProps/core.xml').decode('utf-8') + zip_file.read('docProps/core.xml').decode('utf-8') metadata["has_core_properties"] = True - + # App properties if 'docProps/app.xml' in zip_file.namelist(): - app_xml = zip_file.read('docProps/app.xml').decode('utf-8') + zip_file.read('docProps/app.xml').decode('utf-8') metadata["has_app_properties"] = True - + except Exception: pass - + return metadata -async def _extract_word_metadata(file_path: str, extension: str) -> Dict[str, Any]: +async def _extract_word_metadata(file_path: str, extension: str) -> dict[str, Any]: """Extract Word-specific metadata.""" metadata = {"type": "word", "extension": extension} - + if extension == ".docx": try: import docx doc = docx.Document(file_path) - + core_props = doc.core_properties metadata.update({ "title": core_props.title, @@ -752,7 +840,7 @@ async def _extract_word_metadata(file_path: str, extension: str) -> Dict[str, An "created": str(core_props.created) if core_props.created else None, "modified": str(core_props.modified) if core_props.modified else None }) - + # Document structure metadata.update({ "paragraph_count": len(doc.paragraphs), @@ -760,22 +848,22 @@ async def _extract_word_metadata(file_path: str, extension: str) -> Dict[str, An "has_tables": len(doc.tables) > 0, "table_count": len(doc.tables) }) - + except Exception: pass - + return metadata -async def _extract_excel_metadata(file_path: str, extension: str) -> Dict[str, Any]: +async def _extract_excel_metadata(file_path: str, extension: str) -> dict[str, Any]: """Extract Excel-specific metadata.""" metadata = {"type": "excel", "extension": extension} - + if extension in [".xlsx", ".xlsm"]: try: import openpyxl wb = openpyxl.load_workbook(file_path) - + props = wb.properties metadata.update({ "title": props.title, @@ -786,7 +874,7 @@ async def _extract_excel_metadata(file_path: str, extension: str) -> Dict[str, A "created": str(props.created) if props.created else None, "modified": str(props.modified) if props.modified else None }) - + # Workbook structure metadata.update({ "worksheet_count": len(wb.worksheets), @@ -794,22 +882,22 @@ async def _extract_excel_metadata(file_path: str, extension: str) -> Dict[str, A "has_charts": any(len(ws._charts) > 0 for ws in wb.worksheets), "has_images": any(len(ws._images) > 0 for ws in wb.worksheets) }) - + except Exception: pass - + return metadata -async def _extract_powerpoint_metadata(file_path: str, extension: str) -> Dict[str, Any]: +async def _extract_powerpoint_metadata(file_path: str, extension: str) -> dict[str, Any]: """Extract PowerPoint-specific metadata.""" metadata = {"type": "powerpoint", "extension": extension} - + if extension == ".pptx": try: import pptx prs = pptx.Presentation(file_path) - + core_props = prs.core_properties metadata.update({ "title": core_props.title, @@ -820,15 +908,15 @@ async def _extract_powerpoint_metadata(file_path: str, extension: str) -> Dict[s "created": str(core_props.created) if core_props.created else None, "modified": str(core_props.modified) if core_props.modified else None }) - + # Presentation structure slide_layouts = set() total_shapes = 0 - + for slide in prs.slides: slide_layouts.add(slide.slide_layout.name) total_shapes += len(slide.shapes) - + metadata.update({ "slide_count": len(prs.slides), "slide_layouts": list(slide_layouts), @@ -836,77 +924,532 @@ async def _extract_powerpoint_metadata(file_path: str, extension: str) -> Dict[s "slide_width": prs.slide_width, "slide_height": prs.slide_height }) - + except Exception: pass - + return metadata -def _calculate_health_score(validation: Dict[str, Any], format_info: Dict[str, Any]) -> int: +def _calculate_health_score(validation: dict[str, Any], format_info: dict[str, Any]) -> int: """Calculate document health score (1-10).""" score = 10 - + # Deduct for validation errors if not validation["is_valid"]: score -= 5 - + if validation["errors"]: score -= len(validation["errors"]) * 2 - + if validation["warnings"]: score -= len(validation["warnings"]) - + # Deduct for problematic characteristics if validation.get("password_protected"): score -= 1 - + if format_info.get("is_legacy"): score -= 1 - + structure = format_info.get("structure", {}) if structure.get("estimated_complexity") == "complex": score -= 1 - + return max(1, min(10, score)) -def _get_health_recommendations(validation: Dict[str, Any], format_info: Dict[str, Any]) -> List[str]: +def _get_health_recommendations(validation: dict[str, Any], format_info: dict[str, Any]) -> list[str]: """Get health improvement recommendations.""" recommendations = [] - + if validation["errors"]: recommendations.append("Fix validation errors before processing") - + if validation.get("password_protected"): recommendations.append("Remove password protection if possible") - + if format_info.get("is_legacy"): recommendations.append("Consider converting to modern format (.docx, .xlsx, .pptx)") - + structure = format_info.get("structure", {}) if structure.get("estimated_complexity") == "complex": recommendations.append("Complex document may require specialized processing") - + if not recommendations: recommendations.append("Document appears healthy and ready for processing") - + return recommendations +# Markdown conversion helper functions +async def _convert_docx_to_markdown( + file_path: str, + include_images: bool, + image_mode: str, + max_image_size: int, + preserve_structure: bool, + chunk_size: int, + output_dir: str +) -> dict[str, Any]: + """Convert .docx file to markdown with comprehensive feature support.""" + import base64 + + try: + # Try mammoth first for better HTML->Markdown conversion + import mammoth + + # Configure mammoth for markdown-friendly output + with open(file_path, "rb") as docx_file: + if include_images: + # Extract images and handle them based on mode + images_info = [] + + def convert_image(image): + image_data = image.open() + content_type = image.content_type + ext = content_type.split('/')[-1] if '/' in content_type else 'png' + + if image_mode == "base64": + if len(image_data) <= max_image_size: + encoded = base64.b64encode(image_data).decode('utf-8') + images_info.append({ + "filename": f"image_{len(images_info)}.{ext}", + "content_type": content_type, + "size_bytes": len(image_data), + "mode": "base64" + }) + return { + "src": f"data:{content_type};base64,{encoded}" + } + else: + # Too large for base64, fall back to reference + filename = f"large_image_{len(images_info)}.{ext}" + images_info.append({ + "filename": filename, + "content_type": content_type, + "size_bytes": len(image_data), + "mode": "reference", + "note": "Too large for base64 encoding" + }) + return {"src": filename} + + elif image_mode == "files": + # Save image to file + nonlocal output_dir + if not output_dir: + output_dir = os.path.join(TEMP_DIR, "markdown_images") + + os.makedirs(output_dir, exist_ok=True) + filename = f"image_{len(images_info)}.{ext}" + file_path = os.path.join(output_dir, filename) + + with open(file_path, 'wb') as img_file: + img_file.write(image_data) + + images_info.append({ + "filename": filename, + "file_path": file_path, + "content_type": content_type, + "size_bytes": len(image_data), + "mode": "file" + }) + return {"src": file_path} + + else: # references + filename = f"image_{len(images_info)}.{ext}" + images_info.append({ + "filename": filename, + "content_type": content_type, + "size_bytes": len(image_data), + "mode": "reference" + }) + return {"src": filename} + + # Convert with image handling + result = mammoth.convert_to_html( + docx_file, + convert_image=mammoth.images.img_element(convert_image) + ) + + html_content = result.value + markdown_content = _html_to_markdown(html_content, preserve_structure) + + conversion_result = { + "content": markdown_content, + "method_used": "mammoth-with-images", + "images": images_info + } + + else: + # Convert without images + result = mammoth.convert_to_markdown(docx_file) + markdown_content = result.value + + conversion_result = { + "content": markdown_content, + "method_used": "mammoth-markdown", + "images": [] + } + + # Handle chunking if requested + if chunk_size > 0 and len(markdown_content) > chunk_size: + chunks = _chunk_markdown(markdown_content, chunk_size) + conversion_result["chunks"] = chunks + + # Extract structure information + if preserve_structure: + structure = _extract_markdown_structure(markdown_content) + conversion_result["structure"] = structure + + return conversion_result + + except ImportError: + # Fall back to python-docx with custom markdown conversion + return await _convert_docx_with_python_docx( + file_path, include_images, image_mode, max_image_size, + preserve_structure, chunk_size, output_dir + ) + except Exception: + # Fall back to python-docx + return await _convert_docx_with_python_docx( + file_path, include_images, image_mode, max_image_size, + preserve_structure, chunk_size, output_dir + ) + + +async def _convert_docx_with_python_docx( + file_path: str, + include_images: bool, + image_mode: str, + max_image_size: int, + preserve_structure: bool, + chunk_size: int, + output_dir: str +) -> dict[str, Any]: + """Convert .docx using python-docx with custom markdown conversion.""" + import base64 + + import docx + from docx.oxml.table import CT_Tbl + from docx.oxml.text.paragraph import CT_P + from docx.table import Table + from docx.text.paragraph import Paragraph + + doc = docx.Document(file_path) + markdown_parts = [] + images_info = [] + structure_info = {"headings": [], "tables": 0, "lists": 0, "paragraphs": 0} + + # Extract images if requested + if include_images: + extracted_images = await _extract_word_images(file_path, ".docx", "png", 1, 1) + for i, img in enumerate(extracted_images): + if image_mode == "base64": + if img.get("size_bytes", 0) <= max_image_size: + with open(img["path"], "rb") as img_file: + img_data = img_file.read() + encoded = base64.b64encode(img_data).decode('utf-8') + images_info.append({ + "filename": img["filename"], + "content_type": f"image/{img.get('format', 'png').lower()}", + "size_bytes": img.get("size_bytes", 0), + "mode": "base64", + "markdown_ref": f"![Image {i+1}](data:image/{img.get('format', 'png').lower()};base64,{encoded})" + }) + else: + images_info.append({ + "filename": img["filename"], + "size_bytes": img.get("size_bytes", 0), + "mode": "reference", + "markdown_ref": f"![Image {i+1}]({img['filename']})", + "note": "Too large for base64 encoding" + }) + elif image_mode == "files": + images_info.append({ + "filename": img["filename"], + "file_path": img["path"], + "size_bytes": img.get("size_bytes", 0), + "mode": "file", + "markdown_ref": f"![Image {i+1}]({img['path']})" + }) + else: # references + images_info.append({ + "filename": img["filename"], + "size_bytes": img.get("size_bytes", 0), + "mode": "reference", + "markdown_ref": f"![Image {i+1}]({img['filename']})" + }) + + # Process document elements + for element in doc.element.body: + if isinstance(element, CT_P): + paragraph = Paragraph(element, doc) + markdown_text = _paragraph_to_markdown(paragraph, preserve_structure) + if markdown_text.strip(): + markdown_parts.append(markdown_text) + structure_info["paragraphs"] += 1 + + # Track headings + if preserve_structure and markdown_text.startswith('#'): + level = len(markdown_text) - len(markdown_text.lstrip('#')) + heading_text = markdown_text.lstrip('# ').strip() + structure_info["headings"].append({ + "level": level, + "text": heading_text, + "position": len(markdown_parts) - 1 + }) + + elif isinstance(element, CT_Tbl): + table = Table(element, doc) + table_markdown = _table_to_markdown(table) + if table_markdown.strip(): + markdown_parts.append(table_markdown) + structure_info["tables"] += 1 + + # Add image references at the end if any + if include_images and images_info: + markdown_parts.append("\n## Images\n") + for img in images_info: + markdown_parts.append(img["markdown_ref"]) + + markdown_content = "\n\n".join(markdown_parts) + + result = { + "content": markdown_content, + "method_used": "python-docx-custom", + "images": images_info + } + + # Handle chunking + if chunk_size > 0 and len(markdown_content) > chunk_size: + chunks = _chunk_markdown(markdown_content, chunk_size) + result["chunks"] = chunks + + # Add structure info + if preserve_structure: + result["structure"] = structure_info + + return result + + +async def _convert_doc_to_markdown( + file_path: str, + include_images: bool, + image_mode: str, + max_image_size: int, + preserve_structure: bool, + chunk_size: int, + output_dir: str +) -> dict[str, Any]: + """Convert legacy .doc file to markdown using available methods.""" + try: + import mammoth + + with open(file_path, "rb") as doc_file: + result = mammoth.convert_to_markdown(doc_file) + markdown_content = result.value + + conversion_result = { + "content": markdown_content, + "method_used": "mammoth-doc", + "images": [] # Legacy .doc image extraction is complex + } + + if chunk_size > 0 and len(markdown_content) > chunk_size: + chunks = _chunk_markdown(markdown_content, chunk_size) + conversion_result["chunks"] = chunks + + if preserve_structure: + structure = _extract_markdown_structure(markdown_content) + conversion_result["structure"] = structure + + return conversion_result + + except ImportError: + raise OfficeFileError("Legacy .doc conversion requires mammoth library") + except Exception as e: + raise OfficeFileError(f"Legacy .doc conversion failed: {str(e)}") + + +def _paragraph_to_markdown(paragraph, preserve_structure: bool) -> str: + """Convert a Word paragraph to markdown format.""" + text = paragraph.text.strip() + if not text: + return "" + + if not preserve_structure: + return text + + # Handle different paragraph styles + style_name = paragraph.style.name.lower() if paragraph.style else "" + + if "heading" in style_name: + # Extract heading level from style name + import re + level_match = re.search(r'(\d+)', style_name) + level = int(level_match.group(1)) if level_match else 1 + return f"{'#' * level} {text}" + elif "title" in style_name: + return f"# {text}" + elif "subtitle" in style_name: + return f"## {text}" + elif style_name in ["list paragraph", "list"]: + return f"- {text}" + elif "quote" in style_name: + return f"> {text}" + else: + return text + + +def _table_to_markdown(table) -> str: + """Convert a Word table to markdown format.""" + markdown_rows = [] + + for i, row in enumerate(table.rows): + cells = [cell.text.strip().replace('\n', ' ') for cell in row.cells] + markdown_row = "| " + " | ".join(cells) + " |" + markdown_rows.append(markdown_row) + + # Add header separator after first row + if i == 0: + separator = "| " + " | ".join(["---"] * len(cells)) + " |" + markdown_rows.append(separator) + + return "\n".join(markdown_rows) + + +def _html_to_markdown(html_content: str, preserve_structure: bool) -> str: + """Convert HTML content to markdown format.""" + import re + + # Basic HTML to Markdown conversions + conversions = [ + (r']*>(.*?)', r'# \1'), + (r']*>(.*?)', r'## \1'), + (r']*>(.*?)', r'### \1'), + (r']*>(.*?)', r'#### \1'), + (r']*>(.*?)', r'##### \1'), + (r']*>(.*?)', r'###### \1'), + (r']*>(.*?)', r'**\1**'), + (r']*>(.*?)', r'**\1**'), + (r']*>(.*?)', r'*\1*'), + (r']*>(.*?)', r'*\1*'), + (r']*>(.*?)', r'`\1`'), + (r']*href="([^"]*)"[^>]*>(.*?)', r'[\2](\1)'), + (r']*src="([^"]*)"[^>]*/?>', r'![](\1)'), + (r']*>(.*?)

', r'\1\n'), + (r']*/?>', r'\n'), + (r']*>(.*?)', r'- \1'), + (r']*>(.*?)', r'\1'), + (r']*>(.*?)', r'\1'), + (r']*>(.*?)', r'> \1'), + ] + + markdown = html_content + for pattern, replacement in conversions: + markdown = re.sub(pattern, replacement, markdown, flags=re.DOTALL | re.IGNORECASE) + + # Clean up extra whitespace + markdown = re.sub(r'\n\s*\n\s*\n', '\n\n', markdown) + markdown = re.sub(r'^\s+|\s+$', '', markdown, flags=re.MULTILINE) + + return markdown + + +def _chunk_markdown(content: str, chunk_size: int) -> list[dict[str, Any]]: + """Split markdown content into chunks while preserving structure.""" + chunks = [] + lines = content.split('\n') + current_chunk = [] + current_size = 0 + chunk_num = 1 + + for line in lines: + line_size = len(line) + 1 # +1 for newline + + # If adding this line would exceed chunk size and we have content + if current_size + line_size > chunk_size and current_chunk: + chunks.append({ + "chunk_number": chunk_num, + "content": '\n'.join(current_chunk), + "character_count": current_size, + "line_count": len(current_chunk) + }) + current_chunk = [] + current_size = 0 + chunk_num += 1 + + current_chunk.append(line) + current_size += line_size + + # Add final chunk if there's remaining content + if current_chunk: + chunks.append({ + "chunk_number": chunk_num, + "content": '\n'.join(current_chunk), + "character_count": current_size, + "line_count": len(current_chunk) + }) + + return chunks + + +def _extract_markdown_structure(content: str) -> dict[str, Any]: + """Extract structure information from markdown content.""" + import re + + structure = { + "headings": [], + "lists": 0, + "links": 0, + "images": 0, + "code_blocks": 0, + "tables": 0, + "line_count": len(content.split('\n')) + } + + lines = content.split('\n') + for i, line in enumerate(lines): + # Find headings + heading_match = re.match(r'^(#{1,6})\s+(.+)', line) + if heading_match: + level = len(heading_match.group(1)) + text = heading_match.group(2).strip() + structure["headings"].append({ + "level": level, + "text": text, + "line_number": i + 1 + }) + + # Count other elements + if re.match(r'^[-*+]\s+', line): + structure["lists"] += 1 + + structure["links"] += len(re.findall(r'\[([^\]]+)\]\([^)]+\)', line)) + structure["images"] += len(re.findall(r'!\[([^\]]*)\]\([^)]+\)', line)) + + if line.strip().startswith('```'): + structure["code_blocks"] += 1 + + if '|' in line and line.count('|') >= 2: + structure["tables"] += 1 + + return structure + + def main(): """Main entry point for the MCP server.""" - import asyncio import sys - + if len(sys.argv) > 1 and sys.argv[1] == "--version": from . import __version__ print(f"MCP Office Tools v{__version__}") return - + # Run the FastMCP server app.run() if __name__ == "__main__": - main() \ No newline at end of file + main()