✨ Add comprehensive Markdown conversion with image support
- Add convert_to_markdown tool for .docx/.doc files - Support multiple image handling modes (base64, files, references) - Implement large document chunking for performance - Preserve document structure (headings, lists, tables) - Smart fallback methods (mammoth → python-docx → custom) - Handle both modern and legacy Word formats
This commit is contained in:
parent
1b359c4c7c
commit
b3caed78d3
@ -4,23 +4,22 @@ FastMCP server providing 30+ tools for processing Word, Excel, PowerPoint docume
|
||||
including both modern formats (.docx, .xlsx, .pptx) and legacy formats (.doc, .xls, .ppt).
|
||||
"""
|
||||
|
||||
import time
|
||||
import tempfile
|
||||
import os
|
||||
from typing import Dict, Any, List, Optional, Union
|
||||
import tempfile
|
||||
import time
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
from fastmcp import FastMCP
|
||||
from pydantic import Field
|
||||
|
||||
from .utils import (
|
||||
OfficeFileError,
|
||||
validate_office_file,
|
||||
validate_office_path,
|
||||
detect_format,
|
||||
classify_document_type,
|
||||
detect_format,
|
||||
get_supported_extensions,
|
||||
resolve_office_file_path,
|
||||
get_supported_extensions
|
||||
validate_office_file,
|
||||
)
|
||||
|
||||
# Initialize FastMCP app
|
||||
@ -37,7 +36,7 @@ async def extract_text(
|
||||
preserve_formatting: bool = Field(default=False, description="Preserve text formatting and structure"),
|
||||
include_metadata: bool = Field(default=True, description="Include document metadata in output"),
|
||||
method: str = Field(default="auto", description="Extraction method: auto, primary, fallback")
|
||||
) -> Dict[str, Any]:
|
||||
) -> dict[str, Any]:
|
||||
"""Extract text content from Office documents with intelligent method selection.
|
||||
|
||||
Supports Word (.docx, .doc), Excel (.xlsx, .xls), PowerPoint (.pptx, .ppt),
|
||||
@ -105,7 +104,7 @@ async def extract_images(
|
||||
min_width: int = Field(default=100, description="Minimum image width in pixels"),
|
||||
min_height: int = Field(default=100, description="Minimum image height in pixels"),
|
||||
include_metadata: bool = Field(default=True, description="Include image metadata")
|
||||
) -> Dict[str, Any]:
|
||||
) -> dict[str, Any]:
|
||||
"""Extract images from Office documents with size filtering and format conversion."""
|
||||
start_time = time.time()
|
||||
|
||||
@ -158,7 +157,7 @@ async def extract_images(
|
||||
@app.tool()
|
||||
async def extract_metadata(
|
||||
file_path: str = Field(description="Path to Office document or URL")
|
||||
) -> Dict[str, Any]:
|
||||
) -> dict[str, Any]:
|
||||
"""Extract comprehensive metadata from Office documents."""
|
||||
start_time = time.time()
|
||||
|
||||
@ -215,7 +214,7 @@ async def extract_metadata(
|
||||
@app.tool()
|
||||
async def detect_office_format(
|
||||
file_path: str = Field(description="Path to Office document or URL")
|
||||
) -> Dict[str, Any]:
|
||||
) -> dict[str, Any]:
|
||||
"""Intelligent Office document format detection and analysis."""
|
||||
start_time = time.time()
|
||||
|
||||
@ -249,7 +248,7 @@ async def detect_office_format(
|
||||
@app.tool()
|
||||
async def analyze_document_health(
|
||||
file_path: str = Field(description="Path to Office document or URL")
|
||||
) -> Dict[str, Any]:
|
||||
) -> dict[str, Any]:
|
||||
"""Comprehensive document health and integrity analysis."""
|
||||
start_time = time.time()
|
||||
|
||||
@ -286,7 +285,93 @@ async def analyze_document_health(
|
||||
|
||||
|
||||
@app.tool()
|
||||
async def get_supported_formats() -> Dict[str, Any]:
|
||||
async def convert_to_markdown(
|
||||
file_path: str = Field(description="Path to Office document or URL"),
|
||||
include_images: bool = Field(default=True, description="Include images in markdown with base64 encoding or file references"),
|
||||
image_mode: str = Field(default="base64", description="Image handling mode: 'base64', 'files', or 'references'"),
|
||||
max_image_size: int = Field(default=1024*1024, description="Maximum image size in bytes for base64 encoding"),
|
||||
preserve_structure: bool = Field(default=True, description="Preserve document structure (headings, lists, tables)"),
|
||||
chunk_size: int = Field(default=0, description="Split large documents into chunks (0 = no chunking)"),
|
||||
output_dir: str = Field(default="", description="Output directory for image files (if image_mode='files')")
|
||||
) -> dict[str, Any]:
|
||||
"""Convert Office documents to Markdown format with image support and structure preservation.
|
||||
|
||||
Handles large .docx files efficiently with options for image embedding, file extraction,
|
||||
and document chunking for very large files.
|
||||
"""
|
||||
start_time = time.time()
|
||||
|
||||
try:
|
||||
# Resolve file path
|
||||
local_path = await resolve_office_file_path(file_path)
|
||||
|
||||
# Validate file
|
||||
validation = await validate_office_file(local_path)
|
||||
if not validation["is_valid"]:
|
||||
raise OfficeFileError(f"Invalid file: {', '.join(validation['errors'])}")
|
||||
|
||||
# Get format info
|
||||
format_info = await detect_format(local_path)
|
||||
category = format_info["category"]
|
||||
extension = format_info["extension"]
|
||||
|
||||
# Currently focused on Word documents for markdown conversion
|
||||
if category != "word":
|
||||
raise OfficeFileError(f"Markdown conversion currently only supports Word documents, got: {category}")
|
||||
|
||||
# Convert to markdown based on format
|
||||
if extension == ".docx":
|
||||
markdown_result = await _convert_docx_to_markdown(
|
||||
local_path, include_images, image_mode, max_image_size,
|
||||
preserve_structure, chunk_size, output_dir
|
||||
)
|
||||
else: # .doc
|
||||
# For legacy .doc files, use mammoth if available
|
||||
markdown_result = await _convert_doc_to_markdown(
|
||||
local_path, include_images, image_mode, max_image_size,
|
||||
preserve_structure, chunk_size, output_dir
|
||||
)
|
||||
|
||||
result = {
|
||||
"markdown": markdown_result["content"],
|
||||
"metadata": {
|
||||
"original_file": os.path.basename(local_path),
|
||||
"format": format_info["format_name"],
|
||||
"conversion_method": markdown_result["method_used"],
|
||||
"character_count": len(markdown_result["content"]),
|
||||
"word_count": len(markdown_result["content"].split()),
|
||||
"conversion_time": round(time.time() - start_time, 3)
|
||||
}
|
||||
}
|
||||
|
||||
# Add chunking info if applicable
|
||||
if chunk_size > 0 and markdown_result.get("chunks"):
|
||||
result["chunks"] = markdown_result["chunks"]
|
||||
result["metadata"]["chunk_count"] = len(markdown_result["chunks"])
|
||||
|
||||
# Add image info
|
||||
if include_images and markdown_result.get("images"):
|
||||
result["images"] = markdown_result["images"]
|
||||
result["metadata"]["image_count"] = len(markdown_result["images"])
|
||||
result["metadata"]["total_image_size"] = sum(
|
||||
img.get("size_bytes", 0) for img in markdown_result["images"]
|
||||
)
|
||||
|
||||
# Add structure info
|
||||
if preserve_structure and markdown_result.get("structure"):
|
||||
result["structure"] = markdown_result["structure"]
|
||||
|
||||
return result
|
||||
|
||||
except Exception as e:
|
||||
if DEBUG:
|
||||
import traceback
|
||||
traceback.print_exc()
|
||||
raise OfficeFileError(f"Markdown conversion failed: {str(e)}")
|
||||
|
||||
|
||||
@app.tool()
|
||||
async def get_supported_formats() -> dict[str, Any]:
|
||||
"""Get list of all supported Office document formats and their capabilities."""
|
||||
extensions = get_supported_extensions()
|
||||
|
||||
@ -314,7 +399,7 @@ async def get_supported_formats() -> Dict[str, Any]:
|
||||
|
||||
|
||||
# Helper functions for text extraction
|
||||
async def _extract_word_text(file_path: str, extension: str, preserve_formatting: bool, method: str) -> Dict[str, Any]:
|
||||
async def _extract_word_text(file_path: str, extension: str, preserve_formatting: bool, method: str) -> dict[str, Any]:
|
||||
"""Extract text from Word documents with fallback methods."""
|
||||
methods_tried = []
|
||||
|
||||
@ -414,7 +499,7 @@ async def _extract_word_text(file_path: str, extension: str, preserve_formatting
|
||||
}
|
||||
|
||||
|
||||
async def _extract_excel_text(file_path: str, extension: str, preserve_formatting: bool, method: str) -> Dict[str, Any]:
|
||||
async def _extract_excel_text(file_path: str, extension: str, preserve_formatting: bool, method: str) -> dict[str, Any]:
|
||||
"""Extract text from Excel documents."""
|
||||
methods_tried = []
|
||||
|
||||
@ -526,7 +611,7 @@ async def _extract_excel_text(file_path: str, extension: str, preserve_formattin
|
||||
}
|
||||
|
||||
|
||||
async def _extract_powerpoint_text(file_path: str, extension: str, preserve_formatting: bool, method: str) -> Dict[str, Any]:
|
||||
async def _extract_powerpoint_text(file_path: str, extension: str, preserve_formatting: bool, method: str) -> dict[str, Any]:
|
||||
"""Extract text from PowerPoint documents."""
|
||||
methods_tried = []
|
||||
|
||||
@ -567,7 +652,7 @@ async def _extract_powerpoint_text(file_path: str, extension: str, preserve_form
|
||||
|
||||
except ImportError:
|
||||
methods_tried.append("python-pptx")
|
||||
except Exception as e:
|
||||
except Exception:
|
||||
methods_tried.append("python-pptx")
|
||||
|
||||
# Legacy .ppt handling would require additional libraries
|
||||
@ -578,15 +663,16 @@ async def _extract_powerpoint_text(file_path: str, extension: str, preserve_form
|
||||
|
||||
|
||||
# Helper functions for image extraction
|
||||
async def _extract_word_images(file_path: str, extension: str, output_format: str, min_width: int, min_height: int) -> List[Dict[str, Any]]:
|
||||
async def _extract_word_images(file_path: str, extension: str, output_format: str, min_width: int, min_height: int) -> list[dict[str, Any]]:
|
||||
"""Extract images from Word documents."""
|
||||
images = []
|
||||
|
||||
if extension == ".docx":
|
||||
try:
|
||||
import zipfile
|
||||
from PIL import Image
|
||||
import io
|
||||
import zipfile
|
||||
|
||||
from PIL import Image
|
||||
|
||||
with zipfile.ZipFile(file_path, 'r') as zip_file:
|
||||
# Look for images in media folder
|
||||
@ -621,15 +707,16 @@ async def _extract_word_images(file_path: str, extension: str, output_format: st
|
||||
return images
|
||||
|
||||
|
||||
async def _extract_excel_images(file_path: str, extension: str, output_format: str, min_width: int, min_height: int) -> List[Dict[str, Any]]:
|
||||
async def _extract_excel_images(file_path: str, extension: str, output_format: str, min_width: int, min_height: int) -> list[dict[str, Any]]:
|
||||
"""Extract images from Excel documents."""
|
||||
images = []
|
||||
|
||||
if extension in [".xlsx", ".xlsm"]:
|
||||
try:
|
||||
import zipfile
|
||||
from PIL import Image
|
||||
import io
|
||||
import zipfile
|
||||
|
||||
from PIL import Image
|
||||
|
||||
with zipfile.ZipFile(file_path, 'r') as zip_file:
|
||||
# Look for images in media folder
|
||||
@ -664,15 +751,16 @@ async def _extract_excel_images(file_path: str, extension: str, output_format: s
|
||||
return images
|
||||
|
||||
|
||||
async def _extract_powerpoint_images(file_path: str, extension: str, output_format: str, min_width: int, min_height: int) -> List[Dict[str, Any]]:
|
||||
async def _extract_powerpoint_images(file_path: str, extension: str, output_format: str, min_width: int, min_height: int) -> list[dict[str, Any]]:
|
||||
"""Extract images from PowerPoint documents."""
|
||||
images = []
|
||||
|
||||
if extension == ".pptx":
|
||||
try:
|
||||
import zipfile
|
||||
from PIL import Image
|
||||
import io
|
||||
import zipfile
|
||||
|
||||
from PIL import Image
|
||||
|
||||
with zipfile.ZipFile(file_path, 'r') as zip_file:
|
||||
# Look for images in media folder
|
||||
@ -708,7 +796,7 @@ async def _extract_powerpoint_images(file_path: str, extension: str, output_form
|
||||
|
||||
|
||||
# Helper functions for metadata extraction
|
||||
async def _extract_basic_metadata(file_path: str, extension: str, category: str) -> Dict[str, Any]:
|
||||
async def _extract_basic_metadata(file_path: str, extension: str, category: str) -> dict[str, Any]:
|
||||
"""Extract basic metadata from Office documents."""
|
||||
metadata = {"category": category, "extension": extension}
|
||||
|
||||
@ -719,12 +807,12 @@ async def _extract_basic_metadata(file_path: str, extension: str, category: str)
|
||||
with zipfile.ZipFile(file_path, 'r') as zip_file:
|
||||
# Core properties
|
||||
if 'docProps/core.xml' in zip_file.namelist():
|
||||
core_xml = zip_file.read('docProps/core.xml').decode('utf-8')
|
||||
zip_file.read('docProps/core.xml').decode('utf-8')
|
||||
metadata["has_core_properties"] = True
|
||||
|
||||
# App properties
|
||||
if 'docProps/app.xml' in zip_file.namelist():
|
||||
app_xml = zip_file.read('docProps/app.xml').decode('utf-8')
|
||||
zip_file.read('docProps/app.xml').decode('utf-8')
|
||||
metadata["has_app_properties"] = True
|
||||
|
||||
except Exception:
|
||||
@ -733,7 +821,7 @@ async def _extract_basic_metadata(file_path: str, extension: str, category: str)
|
||||
return metadata
|
||||
|
||||
|
||||
async def _extract_word_metadata(file_path: str, extension: str) -> Dict[str, Any]:
|
||||
async def _extract_word_metadata(file_path: str, extension: str) -> dict[str, Any]:
|
||||
"""Extract Word-specific metadata."""
|
||||
metadata = {"type": "word", "extension": extension}
|
||||
|
||||
@ -767,7 +855,7 @@ async def _extract_word_metadata(file_path: str, extension: str) -> Dict[str, An
|
||||
return metadata
|
||||
|
||||
|
||||
async def _extract_excel_metadata(file_path: str, extension: str) -> Dict[str, Any]:
|
||||
async def _extract_excel_metadata(file_path: str, extension: str) -> dict[str, Any]:
|
||||
"""Extract Excel-specific metadata."""
|
||||
metadata = {"type": "excel", "extension": extension}
|
||||
|
||||
@ -801,7 +889,7 @@ async def _extract_excel_metadata(file_path: str, extension: str) -> Dict[str, A
|
||||
return metadata
|
||||
|
||||
|
||||
async def _extract_powerpoint_metadata(file_path: str, extension: str) -> Dict[str, Any]:
|
||||
async def _extract_powerpoint_metadata(file_path: str, extension: str) -> dict[str, Any]:
|
||||
"""Extract PowerPoint-specific metadata."""
|
||||
metadata = {"type": "powerpoint", "extension": extension}
|
||||
|
||||
@ -843,7 +931,7 @@ async def _extract_powerpoint_metadata(file_path: str, extension: str) -> Dict[s
|
||||
return metadata
|
||||
|
||||
|
||||
def _calculate_health_score(validation: Dict[str, Any], format_info: Dict[str, Any]) -> int:
|
||||
def _calculate_health_score(validation: dict[str, Any], format_info: dict[str, Any]) -> int:
|
||||
"""Calculate document health score (1-10)."""
|
||||
score = 10
|
||||
|
||||
@ -871,7 +959,7 @@ def _calculate_health_score(validation: Dict[str, Any], format_info: Dict[str, A
|
||||
return max(1, min(10, score))
|
||||
|
||||
|
||||
def _get_health_recommendations(validation: Dict[str, Any], format_info: Dict[str, Any]) -> List[str]:
|
||||
def _get_health_recommendations(validation: dict[str, Any], format_info: dict[str, Any]) -> list[str]:
|
||||
"""Get health improvement recommendations."""
|
||||
recommendations = []
|
||||
|
||||
@ -894,9 +982,464 @@ def _get_health_recommendations(validation: Dict[str, Any], format_info: Dict[st
|
||||
return recommendations
|
||||
|
||||
|
||||
# Markdown conversion helper functions
|
||||
async def _convert_docx_to_markdown(
|
||||
file_path: str,
|
||||
include_images: bool,
|
||||
image_mode: str,
|
||||
max_image_size: int,
|
||||
preserve_structure: bool,
|
||||
chunk_size: int,
|
||||
output_dir: str
|
||||
) -> dict[str, Any]:
|
||||
"""Convert .docx file to markdown with comprehensive feature support."""
|
||||
import base64
|
||||
|
||||
try:
|
||||
# Try mammoth first for better HTML->Markdown conversion
|
||||
import mammoth
|
||||
|
||||
# Configure mammoth for markdown-friendly output
|
||||
with open(file_path, "rb") as docx_file:
|
||||
if include_images:
|
||||
# Extract images and handle them based on mode
|
||||
images_info = []
|
||||
|
||||
def convert_image(image):
|
||||
image_data = image.open()
|
||||
content_type = image.content_type
|
||||
ext = content_type.split('/')[-1] if '/' in content_type else 'png'
|
||||
|
||||
if image_mode == "base64":
|
||||
if len(image_data) <= max_image_size:
|
||||
encoded = base64.b64encode(image_data).decode('utf-8')
|
||||
images_info.append({
|
||||
"filename": f"image_{len(images_info)}.{ext}",
|
||||
"content_type": content_type,
|
||||
"size_bytes": len(image_data),
|
||||
"mode": "base64"
|
||||
})
|
||||
return {
|
||||
"src": f"data:{content_type};base64,{encoded}"
|
||||
}
|
||||
else:
|
||||
# Too large for base64, fall back to reference
|
||||
filename = f"large_image_{len(images_info)}.{ext}"
|
||||
images_info.append({
|
||||
"filename": filename,
|
||||
"content_type": content_type,
|
||||
"size_bytes": len(image_data),
|
||||
"mode": "reference",
|
||||
"note": "Too large for base64 encoding"
|
||||
})
|
||||
return {"src": filename}
|
||||
|
||||
elif image_mode == "files":
|
||||
# Save image to file
|
||||
nonlocal output_dir
|
||||
if not output_dir:
|
||||
output_dir = os.path.join(TEMP_DIR, "markdown_images")
|
||||
|
||||
os.makedirs(output_dir, exist_ok=True)
|
||||
filename = f"image_{len(images_info)}.{ext}"
|
||||
file_path = os.path.join(output_dir, filename)
|
||||
|
||||
with open(file_path, 'wb') as img_file:
|
||||
img_file.write(image_data)
|
||||
|
||||
images_info.append({
|
||||
"filename": filename,
|
||||
"file_path": file_path,
|
||||
"content_type": content_type,
|
||||
"size_bytes": len(image_data),
|
||||
"mode": "file"
|
||||
})
|
||||
return {"src": file_path}
|
||||
|
||||
else: # references
|
||||
filename = f"image_{len(images_info)}.{ext}"
|
||||
images_info.append({
|
||||
"filename": filename,
|
||||
"content_type": content_type,
|
||||
"size_bytes": len(image_data),
|
||||
"mode": "reference"
|
||||
})
|
||||
return {"src": filename}
|
||||
|
||||
# Convert with image handling
|
||||
result = mammoth.convert_to_html(
|
||||
docx_file,
|
||||
convert_image=mammoth.images.img_element(convert_image)
|
||||
)
|
||||
|
||||
html_content = result.value
|
||||
markdown_content = _html_to_markdown(html_content, preserve_structure)
|
||||
|
||||
conversion_result = {
|
||||
"content": markdown_content,
|
||||
"method_used": "mammoth-with-images",
|
||||
"images": images_info
|
||||
}
|
||||
|
||||
else:
|
||||
# Convert without images
|
||||
result = mammoth.convert_to_markdown(docx_file)
|
||||
markdown_content = result.value
|
||||
|
||||
conversion_result = {
|
||||
"content": markdown_content,
|
||||
"method_used": "mammoth-markdown",
|
||||
"images": []
|
||||
}
|
||||
|
||||
# Handle chunking if requested
|
||||
if chunk_size > 0 and len(markdown_content) > chunk_size:
|
||||
chunks = _chunk_markdown(markdown_content, chunk_size)
|
||||
conversion_result["chunks"] = chunks
|
||||
|
||||
# Extract structure information
|
||||
if preserve_structure:
|
||||
structure = _extract_markdown_structure(markdown_content)
|
||||
conversion_result["structure"] = structure
|
||||
|
||||
return conversion_result
|
||||
|
||||
except ImportError:
|
||||
# Fall back to python-docx with custom markdown conversion
|
||||
return await _convert_docx_with_python_docx(
|
||||
file_path, include_images, image_mode, max_image_size,
|
||||
preserve_structure, chunk_size, output_dir
|
||||
)
|
||||
except Exception:
|
||||
# Fall back to python-docx
|
||||
return await _convert_docx_with_python_docx(
|
||||
file_path, include_images, image_mode, max_image_size,
|
||||
preserve_structure, chunk_size, output_dir
|
||||
)
|
||||
|
||||
|
||||
async def _convert_docx_with_python_docx(
|
||||
file_path: str,
|
||||
include_images: bool,
|
||||
image_mode: str,
|
||||
max_image_size: int,
|
||||
preserve_structure: bool,
|
||||
chunk_size: int,
|
||||
output_dir: str
|
||||
) -> dict[str, Any]:
|
||||
"""Convert .docx using python-docx with custom markdown conversion."""
|
||||
import base64
|
||||
|
||||
import docx
|
||||
from docx.oxml.table import CT_Tbl
|
||||
from docx.oxml.text.paragraph import CT_P
|
||||
from docx.table import Table
|
||||
from docx.text.paragraph import Paragraph
|
||||
|
||||
doc = docx.Document(file_path)
|
||||
markdown_parts = []
|
||||
images_info = []
|
||||
structure_info = {"headings": [], "tables": 0, "lists": 0, "paragraphs": 0}
|
||||
|
||||
# Extract images if requested
|
||||
if include_images:
|
||||
extracted_images = await _extract_word_images(file_path, ".docx", "png", 1, 1)
|
||||
for i, img in enumerate(extracted_images):
|
||||
if image_mode == "base64":
|
||||
if img.get("size_bytes", 0) <= max_image_size:
|
||||
with open(img["path"], "rb") as img_file:
|
||||
img_data = img_file.read()
|
||||
encoded = base64.b64encode(img_data).decode('utf-8')
|
||||
images_info.append({
|
||||
"filename": img["filename"],
|
||||
"content_type": f"image/{img.get('format', 'png').lower()}",
|
||||
"size_bytes": img.get("size_bytes", 0),
|
||||
"mode": "base64",
|
||||
"markdown_ref": f".lower()};base64,{encoded})"
|
||||
})
|
||||
else:
|
||||
images_info.append({
|
||||
"filename": img["filename"],
|
||||
"size_bytes": img.get("size_bytes", 0),
|
||||
"mode": "reference",
|
||||
"markdown_ref": f"",
|
||||
"note": "Too large for base64 encoding"
|
||||
})
|
||||
elif image_mode == "files":
|
||||
images_info.append({
|
||||
"filename": img["filename"],
|
||||
"file_path": img["path"],
|
||||
"size_bytes": img.get("size_bytes", 0),
|
||||
"mode": "file",
|
||||
"markdown_ref": f""
|
||||
})
|
||||
else: # references
|
||||
images_info.append({
|
||||
"filename": img["filename"],
|
||||
"size_bytes": img.get("size_bytes", 0),
|
||||
"mode": "reference",
|
||||
"markdown_ref": f""
|
||||
})
|
||||
|
||||
# Process document elements
|
||||
for element in doc.element.body:
|
||||
if isinstance(element, CT_P):
|
||||
paragraph = Paragraph(element, doc)
|
||||
markdown_text = _paragraph_to_markdown(paragraph, preserve_structure)
|
||||
if markdown_text.strip():
|
||||
markdown_parts.append(markdown_text)
|
||||
structure_info["paragraphs"] += 1
|
||||
|
||||
# Track headings
|
||||
if preserve_structure and markdown_text.startswith('#'):
|
||||
level = len(markdown_text) - len(markdown_text.lstrip('#'))
|
||||
heading_text = markdown_text.lstrip('# ').strip()
|
||||
structure_info["headings"].append({
|
||||
"level": level,
|
||||
"text": heading_text,
|
||||
"position": len(markdown_parts) - 1
|
||||
})
|
||||
|
||||
elif isinstance(element, CT_Tbl):
|
||||
table = Table(element, doc)
|
||||
table_markdown = _table_to_markdown(table)
|
||||
if table_markdown.strip():
|
||||
markdown_parts.append(table_markdown)
|
||||
structure_info["tables"] += 1
|
||||
|
||||
# Add image references at the end if any
|
||||
if include_images and images_info:
|
||||
markdown_parts.append("\n## Images\n")
|
||||
for img in images_info:
|
||||
markdown_parts.append(img["markdown_ref"])
|
||||
|
||||
markdown_content = "\n\n".join(markdown_parts)
|
||||
|
||||
result = {
|
||||
"content": markdown_content,
|
||||
"method_used": "python-docx-custom",
|
||||
"images": images_info
|
||||
}
|
||||
|
||||
# Handle chunking
|
||||
if chunk_size > 0 and len(markdown_content) > chunk_size:
|
||||
chunks = _chunk_markdown(markdown_content, chunk_size)
|
||||
result["chunks"] = chunks
|
||||
|
||||
# Add structure info
|
||||
if preserve_structure:
|
||||
result["structure"] = structure_info
|
||||
|
||||
return result
|
||||
|
||||
|
||||
async def _convert_doc_to_markdown(
|
||||
file_path: str,
|
||||
include_images: bool,
|
||||
image_mode: str,
|
||||
max_image_size: int,
|
||||
preserve_structure: bool,
|
||||
chunk_size: int,
|
||||
output_dir: str
|
||||
) -> dict[str, Any]:
|
||||
"""Convert legacy .doc file to markdown using available methods."""
|
||||
try:
|
||||
import mammoth
|
||||
|
||||
with open(file_path, "rb") as doc_file:
|
||||
result = mammoth.convert_to_markdown(doc_file)
|
||||
markdown_content = result.value
|
||||
|
||||
conversion_result = {
|
||||
"content": markdown_content,
|
||||
"method_used": "mammoth-doc",
|
||||
"images": [] # Legacy .doc image extraction is complex
|
||||
}
|
||||
|
||||
if chunk_size > 0 and len(markdown_content) > chunk_size:
|
||||
chunks = _chunk_markdown(markdown_content, chunk_size)
|
||||
conversion_result["chunks"] = chunks
|
||||
|
||||
if preserve_structure:
|
||||
structure = _extract_markdown_structure(markdown_content)
|
||||
conversion_result["structure"] = structure
|
||||
|
||||
return conversion_result
|
||||
|
||||
except ImportError:
|
||||
raise OfficeFileError("Legacy .doc conversion requires mammoth library")
|
||||
except Exception as e:
|
||||
raise OfficeFileError(f"Legacy .doc conversion failed: {str(e)}")
|
||||
|
||||
|
||||
def _paragraph_to_markdown(paragraph, preserve_structure: bool) -> str:
|
||||
"""Convert a Word paragraph to markdown format."""
|
||||
text = paragraph.text.strip()
|
||||
if not text:
|
||||
return ""
|
||||
|
||||
if not preserve_structure:
|
||||
return text
|
||||
|
||||
# Handle different paragraph styles
|
||||
style_name = paragraph.style.name.lower() if paragraph.style else ""
|
||||
|
||||
if "heading" in style_name:
|
||||
# Extract heading level from style name
|
||||
import re
|
||||
level_match = re.search(r'(\d+)', style_name)
|
||||
level = int(level_match.group(1)) if level_match else 1
|
||||
return f"{'#' * level} {text}"
|
||||
elif "title" in style_name:
|
||||
return f"# {text}"
|
||||
elif "subtitle" in style_name:
|
||||
return f"## {text}"
|
||||
elif style_name in ["list paragraph", "list"]:
|
||||
return f"- {text}"
|
||||
elif "quote" in style_name:
|
||||
return f"> {text}"
|
||||
else:
|
||||
return text
|
||||
|
||||
|
||||
def _table_to_markdown(table) -> str:
|
||||
"""Convert a Word table to markdown format."""
|
||||
markdown_rows = []
|
||||
|
||||
for i, row in enumerate(table.rows):
|
||||
cells = [cell.text.strip().replace('\n', ' ') for cell in row.cells]
|
||||
markdown_row = "| " + " | ".join(cells) + " |"
|
||||
markdown_rows.append(markdown_row)
|
||||
|
||||
# Add header separator after first row
|
||||
if i == 0:
|
||||
separator = "| " + " | ".join(["---"] * len(cells)) + " |"
|
||||
markdown_rows.append(separator)
|
||||
|
||||
return "\n".join(markdown_rows)
|
||||
|
||||
|
||||
def _html_to_markdown(html_content: str, preserve_structure: bool) -> str:
|
||||
"""Convert HTML content to markdown format."""
|
||||
import re
|
||||
|
||||
# Basic HTML to Markdown conversions
|
||||
conversions = [
|
||||
(r'<h1[^>]*>(.*?)</h1>', r'# \1'),
|
||||
(r'<h2[^>]*>(.*?)</h2>', r'## \1'),
|
||||
(r'<h3[^>]*>(.*?)</h3>', r'### \1'),
|
||||
(r'<h4[^>]*>(.*?)</h4>', r'#### \1'),
|
||||
(r'<h5[^>]*>(.*?)</h5>', r'##### \1'),
|
||||
(r'<h6[^>]*>(.*?)</h6>', r'###### \1'),
|
||||
(r'<strong[^>]*>(.*?)</strong>', r'**\1**'),
|
||||
(r'<b[^>]*>(.*?)</b>', r'**\1**'),
|
||||
(r'<em[^>]*>(.*?)</em>', r'*\1*'),
|
||||
(r'<i[^>]*>(.*?)</i>', r'*\1*'),
|
||||
(r'<code[^>]*>(.*?)</code>', r'`\1`'),
|
||||
(r'<a[^>]*href="([^"]*)"[^>]*>(.*?)</a>', r'[\2](\1)'),
|
||||
(r'<img[^>]*src="([^"]*)"[^>]*/?>', r''),
|
||||
(r'<p[^>]*>(.*?)</p>', r'\1\n'),
|
||||
(r'<br[^>]*/?>', r'\n'),
|
||||
(r'<li[^>]*>(.*?)</li>', r'- \1'),
|
||||
(r'<ul[^>]*>(.*?)</ul>', r'\1'),
|
||||
(r'<ol[^>]*>(.*?)</ol>', r'\1'),
|
||||
(r'<blockquote[^>]*>(.*?)</blockquote>', r'> \1'),
|
||||
]
|
||||
|
||||
markdown = html_content
|
||||
for pattern, replacement in conversions:
|
||||
markdown = re.sub(pattern, replacement, markdown, flags=re.DOTALL | re.IGNORECASE)
|
||||
|
||||
# Clean up extra whitespace
|
||||
markdown = re.sub(r'\n\s*\n\s*\n', '\n\n', markdown)
|
||||
markdown = re.sub(r'^\s+|\s+$', '', markdown, flags=re.MULTILINE)
|
||||
|
||||
return markdown
|
||||
|
||||
|
||||
def _chunk_markdown(content: str, chunk_size: int) -> list[dict[str, Any]]:
|
||||
"""Split markdown content into chunks while preserving structure."""
|
||||
chunks = []
|
||||
lines = content.split('\n')
|
||||
current_chunk = []
|
||||
current_size = 0
|
||||
chunk_num = 1
|
||||
|
||||
for line in lines:
|
||||
line_size = len(line) + 1 # +1 for newline
|
||||
|
||||
# If adding this line would exceed chunk size and we have content
|
||||
if current_size + line_size > chunk_size and current_chunk:
|
||||
chunks.append({
|
||||
"chunk_number": chunk_num,
|
||||
"content": '\n'.join(current_chunk),
|
||||
"character_count": current_size,
|
||||
"line_count": len(current_chunk)
|
||||
})
|
||||
current_chunk = []
|
||||
current_size = 0
|
||||
chunk_num += 1
|
||||
|
||||
current_chunk.append(line)
|
||||
current_size += line_size
|
||||
|
||||
# Add final chunk if there's remaining content
|
||||
if current_chunk:
|
||||
chunks.append({
|
||||
"chunk_number": chunk_num,
|
||||
"content": '\n'.join(current_chunk),
|
||||
"character_count": current_size,
|
||||
"line_count": len(current_chunk)
|
||||
})
|
||||
|
||||
return chunks
|
||||
|
||||
|
||||
def _extract_markdown_structure(content: str) -> dict[str, Any]:
|
||||
"""Extract structure information from markdown content."""
|
||||
import re
|
||||
|
||||
structure = {
|
||||
"headings": [],
|
||||
"lists": 0,
|
||||
"links": 0,
|
||||
"images": 0,
|
||||
"code_blocks": 0,
|
||||
"tables": 0,
|
||||
"line_count": len(content.split('\n'))
|
||||
}
|
||||
|
||||
lines = content.split('\n')
|
||||
for i, line in enumerate(lines):
|
||||
# Find headings
|
||||
heading_match = re.match(r'^(#{1,6})\s+(.+)', line)
|
||||
if heading_match:
|
||||
level = len(heading_match.group(1))
|
||||
text = heading_match.group(2).strip()
|
||||
structure["headings"].append({
|
||||
"level": level,
|
||||
"text": text,
|
||||
"line_number": i + 1
|
||||
})
|
||||
|
||||
# Count other elements
|
||||
if re.match(r'^[-*+]\s+', line):
|
||||
structure["lists"] += 1
|
||||
|
||||
structure["links"] += len(re.findall(r'\[([^\]]+)\]\([^)]+\)', line))
|
||||
structure["images"] += len(re.findall(r'!\[([^\]]*)\]\([^)]+\)', line))
|
||||
|
||||
if line.strip().startswith('```'):
|
||||
structure["code_blocks"] += 1
|
||||
|
||||
if '|' in line and line.count('|') >= 2:
|
||||
structure["tables"] += 1
|
||||
|
||||
return structure
|
||||
|
||||
|
||||
def main():
|
||||
"""Main entry point for the MCP server."""
|
||||
import asyncio
|
||||
import sys
|
||||
|
||||
if len(sys.argv) > 1 and sys.argv[1] == "--version":
|
||||
|
Loading…
x
Reference in New Issue
Block a user