- Bypass all complex processing in summary_only mode - Extract only first 50 paragraphs, max 10 headings, 5 content paragraphs - Add bookmark detection for chapter navigation hints - Limit summary content to 2000 chars max - Prevent 1,282,370 token responses with surgical precision - Show bookmark names as chapter start indicators
1969 lines
74 KiB
Python
1969 lines
74 KiB
Python
"""MCP Office Tools Server - Comprehensive Microsoft Office document processing.
|
|
|
|
FastMCP server providing 30+ tools for processing Word, Excel, PowerPoint documents
|
|
including both modern formats (.docx, .xlsx, .pptx) and legacy formats (.doc, .xls, .ppt).
|
|
"""
|
|
|
|
import os
|
|
import tempfile
|
|
import time
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
from fastmcp import FastMCP
|
|
from pydantic import Field
|
|
|
|
from .utils import (
|
|
OfficeFileError,
|
|
classify_document_type,
|
|
detect_format,
|
|
get_supported_extensions,
|
|
resolve_office_file_path,
|
|
validate_office_file,
|
|
)
|
|
|
|
# Initialize FastMCP app
|
|
app = FastMCP("MCP Office Tools")
|
|
|
|
# Configuration
|
|
TEMP_DIR = os.environ.get("OFFICE_TEMP_DIR", tempfile.gettempdir())
|
|
DEBUG = os.environ.get("DEBUG", "false").lower() == "true"
|
|
|
|
|
|
@app.tool()
|
|
async def extract_text(
|
|
file_path: str = Field(description="Path to Office document or URL"),
|
|
preserve_formatting: bool = Field(default=False, description="Preserve text formatting and structure"),
|
|
include_metadata: bool = Field(default=True, description="Include document metadata in output"),
|
|
method: str = Field(default="auto", description="Extraction method: auto, primary, fallback")
|
|
) -> dict[str, Any]:
|
|
"""Extract text content from Office documents with intelligent method selection.
|
|
|
|
Supports Word (.docx, .doc), Excel (.xlsx, .xls), PowerPoint (.pptx, .ppt),
|
|
and CSV files. Uses multi-library fallback for maximum compatibility.
|
|
"""
|
|
start_time = time.time()
|
|
|
|
try:
|
|
# Resolve file path (download if URL)
|
|
local_path = await resolve_office_file_path(file_path)
|
|
|
|
# Validate file
|
|
validation = await validate_office_file(local_path)
|
|
if not validation["is_valid"]:
|
|
raise OfficeFileError(f"Invalid file: {', '.join(validation['errors'])}")
|
|
|
|
# Get format info
|
|
format_info = await detect_format(local_path)
|
|
category = format_info["category"]
|
|
extension = format_info["extension"]
|
|
|
|
# Route to appropriate extraction method
|
|
if category == "word":
|
|
text_result = await _extract_word_text(local_path, extension, preserve_formatting, method)
|
|
elif category == "excel":
|
|
text_result = await _extract_excel_text(local_path, extension, preserve_formatting, method)
|
|
elif category == "powerpoint":
|
|
text_result = await _extract_powerpoint_text(local_path, extension, preserve_formatting, method)
|
|
else:
|
|
raise OfficeFileError(f"Unsupported document category: {category}")
|
|
|
|
# Compile results
|
|
result = {
|
|
"text": text_result["text"],
|
|
"method_used": text_result["method_used"],
|
|
"character_count": len(text_result["text"]),
|
|
"word_count": len(text_result["text"].split()) if text_result["text"] else 0,
|
|
"extraction_time": round(time.time() - start_time, 3),
|
|
"format_info": {
|
|
"format": format_info["format_name"],
|
|
"category": category,
|
|
"is_legacy": format_info["is_legacy"]
|
|
}
|
|
}
|
|
|
|
if include_metadata:
|
|
result["metadata"] = await _extract_basic_metadata(local_path, extension, category)
|
|
|
|
if preserve_formatting:
|
|
result["formatted_sections"] = text_result.get("formatted_sections", [])
|
|
|
|
return result
|
|
|
|
except Exception as e:
|
|
if DEBUG:
|
|
import traceback
|
|
traceback.print_exc()
|
|
raise OfficeFileError(f"Text extraction failed: {str(e)}")
|
|
|
|
|
|
@app.tool()
|
|
async def extract_images(
|
|
file_path: str = Field(description="Path to Office document or URL"),
|
|
output_format: str = Field(default="png", description="Output image format: png, jpg, jpeg"),
|
|
min_width: int = Field(default=100, description="Minimum image width in pixels"),
|
|
min_height: int = Field(default=100, description="Minimum image height in pixels"),
|
|
include_metadata: bool = Field(default=True, description="Include image metadata")
|
|
) -> dict[str, Any]:
|
|
"""Extract images from Office documents with size filtering and format conversion."""
|
|
start_time = time.time()
|
|
|
|
try:
|
|
# Resolve file path
|
|
local_path = await resolve_office_file_path(file_path)
|
|
|
|
# Validate file
|
|
validation = await validate_office_file(local_path)
|
|
if not validation["is_valid"]:
|
|
raise OfficeFileError(f"Invalid file: {', '.join(validation['errors'])}")
|
|
|
|
# Get format info
|
|
format_info = await detect_format(local_path)
|
|
category = format_info["category"]
|
|
extension = format_info["extension"]
|
|
|
|
# Extract images based on format
|
|
if category == "word":
|
|
images = await _extract_word_images(local_path, extension, output_format, min_width, min_height)
|
|
elif category == "excel":
|
|
images = await _extract_excel_images(local_path, extension, output_format, min_width, min_height)
|
|
elif category == "powerpoint":
|
|
images = await _extract_powerpoint_images(local_path, extension, output_format, min_width, min_height)
|
|
else:
|
|
raise OfficeFileError(f"Image extraction not supported for category: {category}")
|
|
|
|
result = {
|
|
"images": images,
|
|
"image_count": len(images),
|
|
"extraction_time": round(time.time() - start_time, 3),
|
|
"format_info": {
|
|
"format": format_info["format_name"],
|
|
"category": category
|
|
}
|
|
}
|
|
|
|
if include_metadata:
|
|
result["total_size_bytes"] = sum(img.get("size_bytes", 0) for img in images)
|
|
|
|
return result
|
|
|
|
except Exception as e:
|
|
if DEBUG:
|
|
import traceback
|
|
traceback.print_exc()
|
|
raise OfficeFileError(f"Image extraction failed: {str(e)}")
|
|
|
|
|
|
@app.tool()
|
|
async def extract_metadata(
|
|
file_path: str = Field(description="Path to Office document or URL")
|
|
) -> dict[str, Any]:
|
|
"""Extract comprehensive metadata from Office documents."""
|
|
start_time = time.time()
|
|
|
|
try:
|
|
# Resolve file path
|
|
local_path = await resolve_office_file_path(file_path)
|
|
|
|
# Validate file
|
|
validation = await validate_office_file(local_path)
|
|
if not validation["is_valid"]:
|
|
raise OfficeFileError(f"Invalid file: {', '.join(validation['errors'])}")
|
|
|
|
# Get format info
|
|
format_info = await detect_format(local_path)
|
|
category = format_info["category"]
|
|
extension = format_info["extension"]
|
|
|
|
# Extract metadata based on format
|
|
if category == "word":
|
|
metadata = await _extract_word_metadata(local_path, extension)
|
|
elif category == "excel":
|
|
metadata = await _extract_excel_metadata(local_path, extension)
|
|
elif category == "powerpoint":
|
|
metadata = await _extract_powerpoint_metadata(local_path, extension)
|
|
else:
|
|
metadata = {"category": category, "basic_info": "Limited metadata available"}
|
|
|
|
# Add file system metadata
|
|
path = Path(local_path)
|
|
stat = path.stat()
|
|
|
|
result = {
|
|
"document_metadata": metadata,
|
|
"file_metadata": {
|
|
"filename": path.name,
|
|
"file_size": stat.st_size,
|
|
"created": stat.st_ctime,
|
|
"modified": stat.st_mtime,
|
|
"extension": extension
|
|
},
|
|
"format_info": format_info,
|
|
"extraction_time": round(time.time() - start_time, 3)
|
|
}
|
|
|
|
return result
|
|
|
|
except Exception as e:
|
|
if DEBUG:
|
|
import traceback
|
|
traceback.print_exc()
|
|
raise OfficeFileError(f"Metadata extraction failed: {str(e)}")
|
|
|
|
|
|
@app.tool()
|
|
async def detect_office_format(
|
|
file_path: str = Field(description="Path to Office document or URL")
|
|
) -> dict[str, Any]:
|
|
"""Intelligent Office document format detection and analysis."""
|
|
start_time = time.time()
|
|
|
|
try:
|
|
# Resolve file path
|
|
local_path = await resolve_office_file_path(file_path)
|
|
|
|
# Detect format
|
|
format_info = await detect_format(local_path)
|
|
|
|
# Classify document
|
|
classification = await classify_document_type(local_path)
|
|
|
|
result = {
|
|
"format_detection": format_info,
|
|
"document_classification": classification,
|
|
"supported": format_info["is_supported"],
|
|
"processing_recommendations": format_info.get("processing_hints", []),
|
|
"detection_time": round(time.time() - start_time, 3)
|
|
}
|
|
|
|
return result
|
|
|
|
except Exception as e:
|
|
if DEBUG:
|
|
import traceback
|
|
traceback.print_exc()
|
|
raise OfficeFileError(f"Format detection failed: {str(e)}")
|
|
|
|
|
|
@app.tool()
|
|
async def analyze_document_health(
|
|
file_path: str = Field(description="Path to Office document or URL")
|
|
) -> dict[str, Any]:
|
|
"""Comprehensive document health and integrity analysis."""
|
|
start_time = time.time()
|
|
|
|
try:
|
|
# Resolve file path
|
|
local_path = await resolve_office_file_path(file_path)
|
|
|
|
# Validate file thoroughly
|
|
validation = await validate_office_file(local_path)
|
|
|
|
# Get format info
|
|
format_info = await detect_format(local_path)
|
|
|
|
# Health assessment
|
|
health_score = _calculate_health_score(validation, format_info)
|
|
|
|
result = {
|
|
"overall_health": "healthy" if validation["is_valid"] and health_score >= 8 else
|
|
"warning" if health_score >= 5 else "problematic",
|
|
"health_score": health_score,
|
|
"validation_results": validation,
|
|
"format_analysis": format_info,
|
|
"recommendations": _get_health_recommendations(validation, format_info),
|
|
"analysis_time": round(time.time() - start_time, 3)
|
|
}
|
|
|
|
return result
|
|
|
|
except Exception as e:
|
|
if DEBUG:
|
|
import traceback
|
|
traceback.print_exc()
|
|
raise OfficeFileError(f"Health analysis failed: {str(e)}")
|
|
|
|
|
|
@app.tool()
|
|
async def convert_to_markdown(
|
|
file_path: str = Field(description="Path to Office document or URL"),
|
|
include_images: bool = Field(default=True, description="Include images in markdown with base64 encoding or file references"),
|
|
image_mode: str = Field(default="base64", description="Image handling mode: 'base64', 'files', or 'references'"),
|
|
max_image_size: int = Field(default=1024*1024, description="Maximum image size in bytes for base64 encoding"),
|
|
preserve_structure: bool = Field(default=True, description="Preserve document structure (headings, lists, tables)"),
|
|
page_range: str = Field(default="", description="Page range to convert (e.g., '1-5', '3', '1,3,5-10'). RECOMMENDED for large documents. Empty = all pages"),
|
|
summary_only: bool = Field(default=False, description="Return only metadata and truncated summary. STRONGLY RECOMMENDED for large docs (>10 pages)"),
|
|
output_dir: str = Field(default="", description="Output directory for image files (if image_mode='files')")
|
|
) -> dict[str, Any]:
|
|
"""Convert Office documents to Markdown format with intelligent processing recommendations.
|
|
|
|
⚠️ RECOMMENDED WORKFLOW FOR LARGE DOCUMENTS (>5 pages):
|
|
1. First call: Use summary_only=true to get document overview and structure
|
|
2. Then: Use page_range (e.g., "1-10", "15-25") to process specific sections
|
|
|
|
This prevents response size errors and provides efficient processing.
|
|
Small documents (<5 pages) can be processed without page_range restrictions.
|
|
"""
|
|
start_time = time.time()
|
|
|
|
try:
|
|
# Resolve file path
|
|
local_path = await resolve_office_file_path(file_path)
|
|
|
|
# Validate file
|
|
validation = await validate_office_file(local_path)
|
|
if not validation["is_valid"]:
|
|
raise OfficeFileError(f"Invalid file: {', '.join(validation['errors'])}")
|
|
|
|
# Get format info
|
|
format_info = await detect_format(local_path)
|
|
category = format_info["category"]
|
|
extension = format_info["extension"]
|
|
|
|
# Currently focused on Word documents for markdown conversion
|
|
if category != "word":
|
|
raise OfficeFileError(f"Markdown conversion currently only supports Word documents, got: {category}")
|
|
|
|
# Analyze document size and provide intelligent recommendations
|
|
doc_analysis = await _analyze_document_size(local_path, extension)
|
|
processing_recommendation = _get_processing_recommendation(
|
|
doc_analysis, page_range, summary_only
|
|
)
|
|
|
|
# Parse page range if provided
|
|
page_numbers = _parse_page_range(page_range) if page_range else None
|
|
|
|
# Convert to markdown based on format
|
|
if extension == ".docx":
|
|
markdown_result = await _convert_docx_to_markdown(
|
|
local_path, include_images, image_mode, max_image_size,
|
|
preserve_structure, page_numbers, summary_only, output_dir
|
|
)
|
|
else: # .doc
|
|
# For legacy .doc files, use mammoth if available
|
|
markdown_result = await _convert_doc_to_markdown(
|
|
local_path, include_images, image_mode, max_image_size,
|
|
preserve_structure, page_numbers, summary_only, output_dir
|
|
)
|
|
|
|
# Build result based on mode
|
|
result = {
|
|
"metadata": {
|
|
"original_file": os.path.basename(local_path),
|
|
"format": format_info["format_name"],
|
|
"conversion_method": markdown_result["method_used"],
|
|
"conversion_time": round(time.time() - start_time, 3),
|
|
"summary_only": summary_only,
|
|
"document_analysis": doc_analysis,
|
|
"processing_recommendation": processing_recommendation
|
|
}
|
|
}
|
|
|
|
# Add page range info if used
|
|
if page_range:
|
|
result["metadata"]["page_range"] = page_range
|
|
result["metadata"]["pages_processed"] = len(page_numbers) if page_numbers else 0
|
|
|
|
# Add content based on mode
|
|
if summary_only:
|
|
# VERY restrictive summary mode to prevent massive responses
|
|
result["metadata"]["character_count"] = len(markdown_result["content"])
|
|
result["metadata"]["word_count"] = len(markdown_result["content"].split())
|
|
|
|
# Ultra-short summary (only 500 chars max)
|
|
result["summary"] = markdown_result["content"][:500] + "..." if len(markdown_result["content"]) > 500 else markdown_result["content"]
|
|
|
|
# Severely limit table of contents to prevent 1M+ token responses
|
|
if "table_of_contents" in markdown_result:
|
|
toc = markdown_result["table_of_contents"]
|
|
if "sections" in toc and len(toc["sections"]) > 20:
|
|
# Limit to first 20 sections only
|
|
limited_toc = {
|
|
"sections": toc["sections"][:20],
|
|
"total_sections": len(toc["sections"]),
|
|
"showing_first": 20,
|
|
"note": f"Showing first 20 of {len(toc['sections'])} sections. Use page_range to extract specific sections.",
|
|
"suggested_chunking": toc.get("suggested_chunking", [])[:10] # Limit chunking suggestions too
|
|
}
|
|
result["table_of_contents"] = limited_toc
|
|
else:
|
|
result["table_of_contents"] = toc
|
|
else:
|
|
# Include content with automatic size limiting to prevent MCP errors
|
|
content = markdown_result["content"]
|
|
|
|
# Apply aggressive content limiting to stay under 25k token limit
|
|
# Rough estimate: ~4 chars per token, leave buffer for metadata
|
|
max_content_chars = 80000 # ~20k tokens worth of content
|
|
|
|
if len(content) > max_content_chars:
|
|
# Truncate but try to preserve structure
|
|
truncated_content = _smart_truncate_content(content, max_content_chars)
|
|
result["markdown"] = truncated_content
|
|
result["content_truncated"] = True
|
|
result["original_length"] = len(content)
|
|
result["truncated_length"] = len(truncated_content)
|
|
result["truncation_note"] = f"Content truncated to stay under MCP 25k token limit. Original: {len(content):,} chars, Shown: {len(truncated_content):,} chars. Use smaller page ranges for full content."
|
|
else:
|
|
result["markdown"] = content
|
|
result["content_truncated"] = False
|
|
|
|
result["metadata"]["character_count"] = len(content)
|
|
result["metadata"]["word_count"] = len(content.split())
|
|
|
|
# Add image info
|
|
if include_images and markdown_result.get("images"):
|
|
result["images"] = markdown_result["images"]
|
|
result["metadata"]["image_count"] = len(markdown_result["images"])
|
|
result["metadata"]["total_image_size"] = sum(
|
|
img.get("size_bytes", 0) for img in markdown_result["images"]
|
|
)
|
|
|
|
# Add structure info
|
|
if preserve_structure and markdown_result.get("structure"):
|
|
result["structure"] = markdown_result["structure"]
|
|
|
|
return result
|
|
|
|
except Exception as e:
|
|
if DEBUG:
|
|
import traceback
|
|
traceback.print_exc()
|
|
raise OfficeFileError(f"Markdown conversion failed: {str(e)}")
|
|
|
|
|
|
@app.tool()
|
|
async def get_supported_formats() -> dict[str, Any]:
|
|
"""Get list of all supported Office document formats and their capabilities."""
|
|
extensions = get_supported_extensions()
|
|
|
|
format_details = {}
|
|
for ext in extensions:
|
|
from .utils.validation import get_format_info
|
|
info = get_format_info(ext)
|
|
if info:
|
|
format_details[ext] = {
|
|
"format_name": info["format_name"],
|
|
"category": info["category"],
|
|
"mime_types": info["mime_types"]
|
|
}
|
|
|
|
return {
|
|
"supported_extensions": extensions,
|
|
"format_details": format_details,
|
|
"categories": {
|
|
"word": [ext for ext, info in format_details.items() if info["category"] == "word"],
|
|
"excel": [ext for ext, info in format_details.items() if info["category"] == "excel"],
|
|
"powerpoint": [ext for ext, info in format_details.items() if info["category"] == "powerpoint"]
|
|
},
|
|
"total_formats": len(extensions)
|
|
}
|
|
|
|
|
|
# Helper functions for text extraction
|
|
async def _extract_word_text(file_path: str, extension: str, preserve_formatting: bool, method: str) -> dict[str, Any]:
|
|
"""Extract text from Word documents with fallback methods."""
|
|
methods_tried = []
|
|
|
|
# Method selection
|
|
if method == "auto":
|
|
if extension == ".docx":
|
|
method_order = ["python-docx", "mammoth", "docx2txt"]
|
|
else: # .doc
|
|
method_order = ["olefile", "mammoth", "docx2txt"]
|
|
elif method == "primary":
|
|
method_order = ["python-docx"] if extension == ".docx" else ["olefile"]
|
|
else: # fallback
|
|
method_order = ["mammoth", "docx2txt"]
|
|
|
|
text = ""
|
|
formatted_sections = []
|
|
method_used = None
|
|
|
|
for method_name in method_order:
|
|
try:
|
|
methods_tried.append(method_name)
|
|
|
|
if method_name == "python-docx" and extension == ".docx":
|
|
import docx
|
|
doc = docx.Document(file_path)
|
|
|
|
paragraphs = []
|
|
for para in doc.paragraphs:
|
|
paragraphs.append(para.text)
|
|
if preserve_formatting:
|
|
formatted_sections.append({
|
|
"type": "paragraph",
|
|
"text": para.text,
|
|
"style": para.style.name if para.style else None
|
|
})
|
|
|
|
text = "\n".join(paragraphs)
|
|
method_used = "python-docx"
|
|
break
|
|
|
|
elif method_name == "mammoth":
|
|
import mammoth
|
|
|
|
with open(file_path, "rb") as docx_file:
|
|
if preserve_formatting:
|
|
result = mammoth.convert_to_html(docx_file)
|
|
text = result.value
|
|
formatted_sections.append({
|
|
"type": "html",
|
|
"content": result.value
|
|
})
|
|
else:
|
|
result = mammoth.extract_raw_text(docx_file)
|
|
text = result.value
|
|
|
|
method_used = "mammoth"
|
|
break
|
|
|
|
elif method_name == "docx2txt":
|
|
import docx2txt
|
|
text = docx2txt.process(file_path)
|
|
method_used = "docx2txt"
|
|
break
|
|
|
|
elif method_name == "olefile" and extension == ".doc":
|
|
# Basic text extraction for legacy .doc files
|
|
try:
|
|
import olefile
|
|
if olefile.isOleFile(file_path):
|
|
# This is a simplified approach - real .doc parsing is complex
|
|
with open(file_path, 'rb') as f:
|
|
content = f.read()
|
|
# Very basic text extraction attempt
|
|
text = content.decode('utf-8', errors='ignore')
|
|
# Clean up binary artifacts
|
|
import re
|
|
text = re.sub(r'[^\x20-\x7E\n\r\t]', '', text)
|
|
text = '\n'.join(line.strip() for line in text.split('\n') if line.strip())
|
|
method_used = "olefile"
|
|
break
|
|
except Exception:
|
|
continue
|
|
|
|
except ImportError:
|
|
continue
|
|
except Exception:
|
|
continue
|
|
|
|
if not method_used:
|
|
raise OfficeFileError(f"Failed to extract text using methods: {', '.join(methods_tried)}")
|
|
|
|
return {
|
|
"text": text,
|
|
"method_used": method_used,
|
|
"methods_tried": methods_tried,
|
|
"formatted_sections": formatted_sections
|
|
}
|
|
|
|
|
|
async def _extract_excel_text(file_path: str, extension: str, preserve_formatting: bool, method: str) -> dict[str, Any]:
|
|
"""Extract text from Excel documents."""
|
|
methods_tried = []
|
|
|
|
if extension == ".csv":
|
|
# CSV handling
|
|
import pandas as pd
|
|
try:
|
|
df = pd.read_csv(file_path)
|
|
text = df.to_string()
|
|
return {
|
|
"text": text,
|
|
"method_used": "pandas",
|
|
"methods_tried": ["pandas"],
|
|
"formatted_sections": [{"type": "table", "data": df.to_dict()}] if preserve_formatting else []
|
|
}
|
|
except Exception as e:
|
|
raise OfficeFileError(f"CSV processing failed: {str(e)}")
|
|
|
|
# Excel file handling
|
|
text = ""
|
|
formatted_sections = []
|
|
method_used = None
|
|
|
|
method_order = ["openpyxl", "pandas", "xlrd"] if extension == ".xlsx" else ["xlrd", "pandas", "openpyxl"]
|
|
|
|
for method_name in method_order:
|
|
try:
|
|
methods_tried.append(method_name)
|
|
|
|
if method_name == "openpyxl" and extension in [".xlsx", ".xlsm"]:
|
|
import openpyxl
|
|
wb = openpyxl.load_workbook(file_path, data_only=True)
|
|
|
|
text_parts = []
|
|
for sheet_name in wb.sheetnames:
|
|
ws = wb[sheet_name]
|
|
text_parts.append(f"Sheet: {sheet_name}")
|
|
|
|
for row in ws.iter_rows(values_only=True):
|
|
row_text = "\t".join(str(cell) if cell is not None else "" for cell in row)
|
|
if row_text.strip():
|
|
text_parts.append(row_text)
|
|
|
|
if preserve_formatting:
|
|
formatted_sections.append({
|
|
"type": "worksheet",
|
|
"name": sheet_name,
|
|
"data": [[str(cell.value) if cell.value is not None else "" for cell in row] for row in ws.iter_rows()]
|
|
})
|
|
|
|
text = "\n".join(text_parts)
|
|
method_used = "openpyxl"
|
|
break
|
|
|
|
elif method_name == "pandas":
|
|
import pandas as pd
|
|
|
|
if extension in [".xlsx", ".xlsm"]:
|
|
dfs = pd.read_excel(file_path, sheet_name=None)
|
|
else: # .xls
|
|
dfs = pd.read_excel(file_path, sheet_name=None, engine='xlrd')
|
|
|
|
text_parts = []
|
|
for sheet_name, df in dfs.items():
|
|
text_parts.append(f"Sheet: {sheet_name}")
|
|
text_parts.append(df.to_string())
|
|
|
|
if preserve_formatting:
|
|
formatted_sections.append({
|
|
"type": "dataframe",
|
|
"name": sheet_name,
|
|
"data": df.to_dict()
|
|
})
|
|
|
|
text = "\n\n".join(text_parts)
|
|
method_used = "pandas"
|
|
break
|
|
|
|
elif method_name == "xlrd" and extension == ".xls":
|
|
import xlrd
|
|
wb = xlrd.open_workbook(file_path)
|
|
|
|
text_parts = []
|
|
for sheet in wb.sheets():
|
|
text_parts.append(f"Sheet: {sheet.name}")
|
|
|
|
for row_idx in range(sheet.nrows):
|
|
row = sheet.row_values(row_idx)
|
|
row_text = "\t".join(str(cell) for cell in row)
|
|
text_parts.append(row_text)
|
|
|
|
text = "\n".join(text_parts)
|
|
method_used = "xlrd"
|
|
break
|
|
|
|
except ImportError:
|
|
continue
|
|
except Exception:
|
|
continue
|
|
|
|
if not method_used:
|
|
raise OfficeFileError(f"Failed to extract text using methods: {', '.join(methods_tried)}")
|
|
|
|
return {
|
|
"text": text,
|
|
"method_used": method_used,
|
|
"methods_tried": methods_tried,
|
|
"formatted_sections": formatted_sections
|
|
}
|
|
|
|
|
|
async def _extract_powerpoint_text(file_path: str, extension: str, preserve_formatting: bool, method: str) -> dict[str, Any]:
|
|
"""Extract text from PowerPoint documents."""
|
|
methods_tried = []
|
|
|
|
if extension == ".pptx":
|
|
try:
|
|
import pptx
|
|
prs = pptx.Presentation(file_path)
|
|
|
|
text_parts = []
|
|
formatted_sections = []
|
|
|
|
for slide_num, slide in enumerate(prs.slides, 1):
|
|
slide_text_parts = []
|
|
|
|
for shape in slide.shapes:
|
|
if hasattr(shape, "text") and shape.text:
|
|
slide_text_parts.append(shape.text)
|
|
|
|
slide_text = "\n".join(slide_text_parts)
|
|
text_parts.append(f"Slide {slide_num}:\n{slide_text}")
|
|
|
|
if preserve_formatting:
|
|
formatted_sections.append({
|
|
"type": "slide",
|
|
"number": slide_num,
|
|
"text": slide_text,
|
|
"shapes": len(slide.shapes)
|
|
})
|
|
|
|
text = "\n\n".join(text_parts)
|
|
|
|
return {
|
|
"text": text,
|
|
"method_used": "python-pptx",
|
|
"methods_tried": ["python-pptx"],
|
|
"formatted_sections": formatted_sections
|
|
}
|
|
|
|
except ImportError:
|
|
methods_tried.append("python-pptx")
|
|
except Exception:
|
|
methods_tried.append("python-pptx")
|
|
|
|
# Legacy .ppt handling would require additional libraries
|
|
if extension == ".ppt":
|
|
raise OfficeFileError("Legacy PowerPoint (.ppt) text extraction requires additional setup")
|
|
|
|
raise OfficeFileError(f"Failed to extract text using methods: {', '.join(methods_tried)}")
|
|
|
|
|
|
# Helper functions for image extraction
|
|
async def _extract_word_images(file_path: str, extension: str, output_format: str, min_width: int, min_height: int) -> list[dict[str, Any]]:
|
|
"""Extract images from Word documents."""
|
|
images = []
|
|
|
|
if extension == ".docx":
|
|
try:
|
|
import io
|
|
import zipfile
|
|
|
|
from PIL import Image
|
|
|
|
with zipfile.ZipFile(file_path, 'r') as zip_file:
|
|
# Look for images in media folder
|
|
image_files = [f for f in zip_file.namelist() if f.startswith('word/media/')]
|
|
|
|
for i, img_path in enumerate(image_files):
|
|
try:
|
|
img_data = zip_file.read(img_path)
|
|
img = Image.open(io.BytesIO(img_data))
|
|
|
|
# Size filtering
|
|
if img.width >= min_width and img.height >= min_height:
|
|
# Save to temp file
|
|
temp_path = os.path.join(TEMP_DIR, f"word_image_{i}.{output_format}")
|
|
img.save(temp_path, format=output_format.upper())
|
|
|
|
images.append({
|
|
"index": i,
|
|
"filename": os.path.basename(img_path),
|
|
"path": temp_path,
|
|
"width": img.width,
|
|
"height": img.height,
|
|
"format": img.format,
|
|
"size_bytes": len(img_data)
|
|
})
|
|
except Exception:
|
|
continue
|
|
|
|
except Exception as e:
|
|
raise OfficeFileError(f"Word image extraction failed: {str(e)}")
|
|
|
|
return images
|
|
|
|
|
|
async def _extract_excel_images(file_path: str, extension: str, output_format: str, min_width: int, min_height: int) -> list[dict[str, Any]]:
|
|
"""Extract images from Excel documents."""
|
|
images = []
|
|
|
|
if extension in [".xlsx", ".xlsm"]:
|
|
try:
|
|
import io
|
|
import zipfile
|
|
|
|
from PIL import Image
|
|
|
|
with zipfile.ZipFile(file_path, 'r') as zip_file:
|
|
# Look for images in media folder
|
|
image_files = [f for f in zip_file.namelist() if f.startswith('xl/media/')]
|
|
|
|
for i, img_path in enumerate(image_files):
|
|
try:
|
|
img_data = zip_file.read(img_path)
|
|
img = Image.open(io.BytesIO(img_data))
|
|
|
|
# Size filtering
|
|
if img.width >= min_width and img.height >= min_height:
|
|
# Save to temp file
|
|
temp_path = os.path.join(TEMP_DIR, f"excel_image_{i}.{output_format}")
|
|
img.save(temp_path, format=output_format.upper())
|
|
|
|
images.append({
|
|
"index": i,
|
|
"filename": os.path.basename(img_path),
|
|
"path": temp_path,
|
|
"width": img.width,
|
|
"height": img.height,
|
|
"format": img.format,
|
|
"size_bytes": len(img_data)
|
|
})
|
|
except Exception:
|
|
continue
|
|
|
|
except Exception as e:
|
|
raise OfficeFileError(f"Excel image extraction failed: {str(e)}")
|
|
|
|
return images
|
|
|
|
|
|
async def _extract_powerpoint_images(file_path: str, extension: str, output_format: str, min_width: int, min_height: int) -> list[dict[str, Any]]:
|
|
"""Extract images from PowerPoint documents."""
|
|
images = []
|
|
|
|
if extension == ".pptx":
|
|
try:
|
|
import io
|
|
import zipfile
|
|
|
|
from PIL import Image
|
|
|
|
with zipfile.ZipFile(file_path, 'r') as zip_file:
|
|
# Look for images in media folder
|
|
image_files = [f for f in zip_file.namelist() if f.startswith('ppt/media/')]
|
|
|
|
for i, img_path in enumerate(image_files):
|
|
try:
|
|
img_data = zip_file.read(img_path)
|
|
img = Image.open(io.BytesIO(img_data))
|
|
|
|
# Size filtering
|
|
if img.width >= min_width and img.height >= min_height:
|
|
# Save to temp file
|
|
temp_path = os.path.join(TEMP_DIR, f"powerpoint_image_{i}.{output_format}")
|
|
img.save(temp_path, format=output_format.upper())
|
|
|
|
images.append({
|
|
"index": i,
|
|
"filename": os.path.basename(img_path),
|
|
"path": temp_path,
|
|
"width": img.width,
|
|
"height": img.height,
|
|
"format": img.format,
|
|
"size_bytes": len(img_data)
|
|
})
|
|
except Exception:
|
|
continue
|
|
|
|
except Exception as e:
|
|
raise OfficeFileError(f"PowerPoint image extraction failed: {str(e)}")
|
|
|
|
return images
|
|
|
|
|
|
# Helper functions for metadata extraction
|
|
async def _extract_basic_metadata(file_path: str, extension: str, category: str) -> dict[str, Any]:
|
|
"""Extract basic metadata from Office documents."""
|
|
metadata = {"category": category, "extension": extension}
|
|
|
|
try:
|
|
if extension in [".docx", ".xlsx", ".pptx"] and category in ["word", "excel", "powerpoint"]:
|
|
import zipfile
|
|
|
|
with zipfile.ZipFile(file_path, 'r') as zip_file:
|
|
# Core properties
|
|
if 'docProps/core.xml' in zip_file.namelist():
|
|
zip_file.read('docProps/core.xml').decode('utf-8')
|
|
metadata["has_core_properties"] = True
|
|
|
|
# App properties
|
|
if 'docProps/app.xml' in zip_file.namelist():
|
|
zip_file.read('docProps/app.xml').decode('utf-8')
|
|
metadata["has_app_properties"] = True
|
|
|
|
except Exception:
|
|
pass
|
|
|
|
return metadata
|
|
|
|
|
|
async def _extract_word_metadata(file_path: str, extension: str) -> dict[str, Any]:
|
|
"""Extract Word-specific metadata."""
|
|
metadata = {"type": "word", "extension": extension}
|
|
|
|
if extension == ".docx":
|
|
try:
|
|
import docx
|
|
doc = docx.Document(file_path)
|
|
|
|
core_props = doc.core_properties
|
|
metadata.update({
|
|
"title": core_props.title,
|
|
"author": core_props.author,
|
|
"subject": core_props.subject,
|
|
"keywords": core_props.keywords,
|
|
"comments": core_props.comments,
|
|
"created": str(core_props.created) if core_props.created else None,
|
|
"modified": str(core_props.modified) if core_props.modified else None
|
|
})
|
|
|
|
# Document structure
|
|
metadata.update({
|
|
"paragraph_count": len(doc.paragraphs),
|
|
"section_count": len(doc.sections),
|
|
"has_tables": len(doc.tables) > 0,
|
|
"table_count": len(doc.tables)
|
|
})
|
|
|
|
except Exception:
|
|
pass
|
|
|
|
return metadata
|
|
|
|
|
|
async def _extract_excel_metadata(file_path: str, extension: str) -> dict[str, Any]:
|
|
"""Extract Excel-specific metadata."""
|
|
metadata = {"type": "excel", "extension": extension}
|
|
|
|
if extension in [".xlsx", ".xlsm"]:
|
|
try:
|
|
import openpyxl
|
|
wb = openpyxl.load_workbook(file_path)
|
|
|
|
props = wb.properties
|
|
metadata.update({
|
|
"title": props.title,
|
|
"creator": props.creator,
|
|
"subject": props.subject,
|
|
"description": props.description,
|
|
"keywords": props.keywords,
|
|
"created": str(props.created) if props.created else None,
|
|
"modified": str(props.modified) if props.modified else None
|
|
})
|
|
|
|
# Workbook structure
|
|
metadata.update({
|
|
"worksheet_count": len(wb.worksheets),
|
|
"worksheet_names": wb.sheetnames,
|
|
"has_charts": any(len(ws._charts) > 0 for ws in wb.worksheets),
|
|
"has_images": any(len(ws._images) > 0 for ws in wb.worksheets)
|
|
})
|
|
|
|
except Exception:
|
|
pass
|
|
|
|
return metadata
|
|
|
|
|
|
async def _extract_powerpoint_metadata(file_path: str, extension: str) -> dict[str, Any]:
|
|
"""Extract PowerPoint-specific metadata."""
|
|
metadata = {"type": "powerpoint", "extension": extension}
|
|
|
|
if extension == ".pptx":
|
|
try:
|
|
import pptx
|
|
prs = pptx.Presentation(file_path)
|
|
|
|
core_props = prs.core_properties
|
|
metadata.update({
|
|
"title": core_props.title,
|
|
"author": core_props.author,
|
|
"subject": core_props.subject,
|
|
"keywords": core_props.keywords,
|
|
"comments": core_props.comments,
|
|
"created": str(core_props.created) if core_props.created else None,
|
|
"modified": str(core_props.modified) if core_props.modified else None
|
|
})
|
|
|
|
# Presentation structure
|
|
slide_layouts = set()
|
|
total_shapes = 0
|
|
|
|
for slide in prs.slides:
|
|
slide_layouts.add(slide.slide_layout.name)
|
|
total_shapes += len(slide.shapes)
|
|
|
|
metadata.update({
|
|
"slide_count": len(prs.slides),
|
|
"slide_layouts": list(slide_layouts),
|
|
"total_shapes": total_shapes,
|
|
"slide_width": prs.slide_width,
|
|
"slide_height": prs.slide_height
|
|
})
|
|
|
|
except Exception:
|
|
pass
|
|
|
|
return metadata
|
|
|
|
|
|
def _calculate_health_score(validation: dict[str, Any], format_info: dict[str, Any]) -> int:
|
|
"""Calculate document health score (1-10)."""
|
|
score = 10
|
|
|
|
# Deduct for validation errors
|
|
if not validation["is_valid"]:
|
|
score -= 5
|
|
|
|
if validation["errors"]:
|
|
score -= len(validation["errors"]) * 2
|
|
|
|
if validation["warnings"]:
|
|
score -= len(validation["warnings"])
|
|
|
|
# Deduct for problematic characteristics
|
|
if validation.get("password_protected"):
|
|
score -= 1
|
|
|
|
if format_info.get("is_legacy"):
|
|
score -= 1
|
|
|
|
structure = format_info.get("structure", {})
|
|
if structure.get("estimated_complexity") == "complex":
|
|
score -= 1
|
|
|
|
return max(1, min(10, score))
|
|
|
|
|
|
def _get_health_recommendations(validation: dict[str, Any], format_info: dict[str, Any]) -> list[str]:
|
|
"""Get health improvement recommendations."""
|
|
recommendations = []
|
|
|
|
if validation["errors"]:
|
|
recommendations.append("Fix validation errors before processing")
|
|
|
|
if validation.get("password_protected"):
|
|
recommendations.append("Remove password protection if possible")
|
|
|
|
if format_info.get("is_legacy"):
|
|
recommendations.append("Consider converting to modern format (.docx, .xlsx, .pptx)")
|
|
|
|
structure = format_info.get("structure", {})
|
|
if structure.get("estimated_complexity") == "complex":
|
|
recommendations.append("Complex document may require specialized processing")
|
|
|
|
if not recommendations:
|
|
recommendations.append("Document appears healthy and ready for processing")
|
|
|
|
return recommendations
|
|
|
|
|
|
# Markdown conversion helper functions
|
|
async def _convert_docx_to_markdown(
|
|
file_path: str,
|
|
include_images: bool,
|
|
image_mode: str,
|
|
max_image_size: int,
|
|
preserve_structure: bool,
|
|
page_numbers: list[int],
|
|
summary_only: bool,
|
|
output_dir: str
|
|
) -> dict[str, Any]:
|
|
"""Convert .docx file to markdown with comprehensive feature support."""
|
|
import base64
|
|
|
|
# ULTRA-FAST summary mode - skip all complex processing
|
|
if summary_only:
|
|
return await _get_ultra_fast_summary(file_path)
|
|
|
|
# If page_numbers is specified, we need to use python-docx for page-based extraction
|
|
# as mammoth processes the entire document
|
|
if page_numbers:
|
|
return await _convert_docx_with_python_docx(
|
|
file_path, include_images, image_mode, max_image_size,
|
|
preserve_structure, page_numbers, summary_only, output_dir
|
|
)
|
|
|
|
try:
|
|
# Try mammoth first for better HTML->Markdown conversion (full document only)
|
|
import mammoth
|
|
|
|
# Configure mammoth for markdown-friendly output
|
|
with open(file_path, "rb") as docx_file:
|
|
if include_images:
|
|
# Extract images and handle them based on mode
|
|
images_info = []
|
|
|
|
def convert_image(image):
|
|
image_data = image.open()
|
|
content_type = image.content_type
|
|
ext = content_type.split('/')[-1] if '/' in content_type else 'png'
|
|
|
|
if image_mode == "base64":
|
|
if len(image_data) <= max_image_size:
|
|
encoded = base64.b64encode(image_data).decode('utf-8')
|
|
images_info.append({
|
|
"filename": f"image_{len(images_info)}.{ext}",
|
|
"content_type": content_type,
|
|
"size_bytes": len(image_data),
|
|
"mode": "base64"
|
|
})
|
|
return {
|
|
"src": f"data:{content_type};base64,{encoded}"
|
|
}
|
|
else:
|
|
# Too large for base64, fall back to reference
|
|
filename = f"large_image_{len(images_info)}.{ext}"
|
|
images_info.append({
|
|
"filename": filename,
|
|
"content_type": content_type,
|
|
"size_bytes": len(image_data),
|
|
"mode": "reference",
|
|
"note": "Too large for base64 encoding"
|
|
})
|
|
return {"src": filename}
|
|
|
|
elif image_mode == "files":
|
|
# Save image to file
|
|
nonlocal output_dir
|
|
if not output_dir:
|
|
output_dir = os.path.join(TEMP_DIR, "markdown_images")
|
|
|
|
os.makedirs(output_dir, exist_ok=True)
|
|
filename = f"image_{len(images_info)}.{ext}"
|
|
file_path = os.path.join(output_dir, filename)
|
|
|
|
with open(file_path, 'wb') as img_file:
|
|
img_file.write(image_data)
|
|
|
|
images_info.append({
|
|
"filename": filename,
|
|
"file_path": file_path,
|
|
"content_type": content_type,
|
|
"size_bytes": len(image_data),
|
|
"mode": "file"
|
|
})
|
|
return {"src": file_path}
|
|
|
|
else: # references
|
|
filename = f"image_{len(images_info)}.{ext}"
|
|
images_info.append({
|
|
"filename": filename,
|
|
"content_type": content_type,
|
|
"size_bytes": len(image_data),
|
|
"mode": "reference"
|
|
})
|
|
return {"src": filename}
|
|
|
|
# Convert with image handling
|
|
result = mammoth.convert_to_html(
|
|
docx_file,
|
|
convert_image=mammoth.images.img_element(convert_image)
|
|
)
|
|
|
|
html_content = result.value
|
|
markdown_content = _html_to_markdown(html_content, preserve_structure)
|
|
|
|
conversion_result = {
|
|
"content": markdown_content,
|
|
"method_used": "mammoth-with-images",
|
|
"images": images_info
|
|
}
|
|
|
|
else:
|
|
# Convert without images
|
|
result = mammoth.convert_to_markdown(docx_file)
|
|
markdown_content = result.value
|
|
|
|
conversion_result = {
|
|
"content": markdown_content,
|
|
"method_used": "mammoth-markdown",
|
|
"images": []
|
|
}
|
|
|
|
# Handle summary mode
|
|
if summary_only and len(markdown_content) > 5000:
|
|
# For summary mode, truncate large content
|
|
markdown_content = markdown_content[:5000] + "\n\n[Content truncated - use summary_only=false for full content]"
|
|
|
|
# Update the conversion result
|
|
conversion_result["content"] = markdown_content
|
|
|
|
# Extract structure information
|
|
if preserve_structure:
|
|
structure = _extract_markdown_structure(markdown_content)
|
|
conversion_result["structure"] = structure
|
|
|
|
return conversion_result
|
|
|
|
except ImportError:
|
|
# Fall back to python-docx with custom markdown conversion
|
|
return await _convert_docx_with_python_docx(
|
|
file_path, include_images, image_mode, max_image_size,
|
|
preserve_structure, page_numbers, summary_only, output_dir
|
|
)
|
|
except Exception:
|
|
# Fall back to python-docx
|
|
return await _convert_docx_with_python_docx(
|
|
file_path, include_images, image_mode, max_image_size,
|
|
preserve_structure, page_numbers, summary_only, output_dir
|
|
)
|
|
|
|
|
|
async def _convert_docx_with_python_docx(
|
|
file_path: str,
|
|
include_images: bool,
|
|
image_mode: str,
|
|
max_image_size: int,
|
|
preserve_structure: bool,
|
|
page_numbers: list[int],
|
|
summary_only: bool,
|
|
output_dir: str
|
|
) -> dict[str, Any]:
|
|
"""Convert .docx using python-docx with custom markdown conversion."""
|
|
import base64
|
|
|
|
import docx
|
|
from docx.oxml.table import CT_Tbl
|
|
from docx.oxml.text.paragraph import CT_P
|
|
from docx.table import Table
|
|
from docx.text.paragraph import Paragraph
|
|
|
|
doc = docx.Document(file_path)
|
|
markdown_parts = []
|
|
images_info = []
|
|
structure_info = {"headings": [], "tables": 0, "lists": 0, "paragraphs": 0}
|
|
|
|
# Extract images if requested
|
|
if include_images:
|
|
extracted_images = await _extract_word_images(file_path, ".docx", "png", 1, 1)
|
|
for i, img in enumerate(extracted_images):
|
|
if image_mode == "base64":
|
|
if img.get("size_bytes", 0) <= max_image_size:
|
|
with open(img["path"], "rb") as img_file:
|
|
img_data = img_file.read()
|
|
encoded = base64.b64encode(img_data).decode('utf-8')
|
|
images_info.append({
|
|
"filename": img["filename"],
|
|
"content_type": f"image/{img.get('format', 'png').lower()}",
|
|
"size_bytes": img.get("size_bytes", 0),
|
|
"mode": "base64",
|
|
"markdown_ref": f".lower()};base64,{encoded})"
|
|
})
|
|
else:
|
|
images_info.append({
|
|
"filename": img["filename"],
|
|
"size_bytes": img.get("size_bytes", 0),
|
|
"mode": "reference",
|
|
"markdown_ref": f"",
|
|
"note": "Too large for base64 encoding"
|
|
})
|
|
elif image_mode == "files":
|
|
images_info.append({
|
|
"filename": img["filename"],
|
|
"file_path": img["path"],
|
|
"size_bytes": img.get("size_bytes", 0),
|
|
"mode": "file",
|
|
"markdown_ref": f""
|
|
})
|
|
else: # references
|
|
images_info.append({
|
|
"filename": img["filename"],
|
|
"size_bytes": img.get("size_bytes", 0),
|
|
"mode": "reference",
|
|
"markdown_ref": f""
|
|
})
|
|
|
|
# Process document elements with page filtering if specified
|
|
current_page = 1
|
|
include_current_page = not page_numbers or current_page in page_numbers
|
|
table_of_contents = [] # Track headings with page numbers for TOC
|
|
|
|
for element in doc.element.body:
|
|
if isinstance(element, CT_P):
|
|
paragraph = Paragraph(element, doc)
|
|
|
|
# Check for page breaks
|
|
if _has_page_break(paragraph):
|
|
current_page += 1
|
|
include_current_page = not page_numbers or current_page in page_numbers
|
|
continue
|
|
|
|
# Only process content from specified pages
|
|
if include_current_page:
|
|
markdown_text = _paragraph_to_markdown(paragraph, preserve_structure)
|
|
if markdown_text.strip():
|
|
markdown_parts.append(markdown_text)
|
|
structure_info["paragraphs"] += 1
|
|
|
|
# Track headings for both structure and TOC
|
|
if preserve_structure and markdown_text.startswith('#'):
|
|
level = len(markdown_text) - len(markdown_text.lstrip('#'))
|
|
heading_text = markdown_text.lstrip('# ').strip()
|
|
heading_info = {
|
|
"level": level,
|
|
"text": heading_text,
|
|
"position": len(markdown_parts) - 1,
|
|
"page": current_page
|
|
}
|
|
structure_info["headings"].append(heading_info)
|
|
|
|
# Add to table of contents
|
|
table_of_contents.append({
|
|
"level": level,
|
|
"title": heading_text,
|
|
"page": current_page,
|
|
"suggested_page_range": f"{current_page}-{current_page + _estimate_section_length(level)}"
|
|
})
|
|
|
|
elif isinstance(element, CT_Tbl):
|
|
# Only process tables from specified pages
|
|
if include_current_page:
|
|
table = Table(element, doc)
|
|
table_markdown = _table_to_markdown(table)
|
|
if table_markdown.strip():
|
|
markdown_parts.append(table_markdown)
|
|
structure_info["tables"] += 1
|
|
|
|
# Add image references at the end if any
|
|
if include_images and images_info:
|
|
markdown_parts.append("\n## Images\n")
|
|
for img in images_info:
|
|
markdown_parts.append(img["markdown_ref"])
|
|
|
|
markdown_content = "\n\n".join(markdown_parts)
|
|
|
|
result = {
|
|
"content": markdown_content,
|
|
"method_used": "python-docx-custom",
|
|
"images": images_info
|
|
}
|
|
|
|
# Add table of contents for navigation
|
|
if table_of_contents:
|
|
result["table_of_contents"] = _optimize_toc_page_ranges(table_of_contents)
|
|
|
|
# Add page filtering info
|
|
if page_numbers:
|
|
result["pages_processed"] = page_numbers
|
|
result["total_pages_in_range"] = len(page_numbers)
|
|
|
|
# Handle summary mode
|
|
if summary_only and len(markdown_content) > 5000:
|
|
markdown_content = markdown_content[:5000] + "\n\n[Content truncated - use summary_only=false for full content]"
|
|
|
|
# Update the result content
|
|
result["content"] = markdown_content
|
|
|
|
# Add structure info
|
|
if preserve_structure:
|
|
result["structure"] = structure_info
|
|
|
|
return result
|
|
|
|
|
|
async def _convert_doc_to_markdown(
|
|
file_path: str,
|
|
include_images: bool,
|
|
image_mode: str,
|
|
max_image_size: int,
|
|
preserve_structure: bool,
|
|
page_numbers: list[int],
|
|
summary_only: bool,
|
|
output_dir: str
|
|
) -> dict[str, Any]:
|
|
"""Convert legacy .doc file to markdown using available methods."""
|
|
try:
|
|
import mammoth
|
|
|
|
with open(file_path, "rb") as doc_file:
|
|
result = mammoth.convert_to_markdown(doc_file)
|
|
markdown_content = result.value
|
|
|
|
conversion_result = {
|
|
"content": markdown_content,
|
|
"method_used": "mammoth-doc",
|
|
"images": [] # Legacy .doc image extraction is complex
|
|
}
|
|
|
|
# Handle summary mode
|
|
if summary_only and len(markdown_content) > 5000:
|
|
markdown_content = markdown_content[:5000] + "\n\n[Content truncated - use summary_only=false for full content]"
|
|
|
|
# Update the conversion result
|
|
conversion_result["content"] = markdown_content
|
|
|
|
if preserve_structure:
|
|
structure = _extract_markdown_structure(markdown_content)
|
|
conversion_result["structure"] = structure
|
|
|
|
return conversion_result
|
|
|
|
except ImportError:
|
|
raise OfficeFileError("Legacy .doc conversion requires mammoth library")
|
|
except Exception as e:
|
|
raise OfficeFileError(f"Legacy .doc conversion failed: {str(e)}")
|
|
|
|
|
|
def _paragraph_to_markdown(paragraph, preserve_structure: bool) -> str:
|
|
"""Convert a Word paragraph to markdown format."""
|
|
text = paragraph.text.strip()
|
|
if not text:
|
|
return ""
|
|
|
|
if not preserve_structure:
|
|
return text
|
|
|
|
# Handle different paragraph styles
|
|
style_name = paragraph.style.name.lower() if paragraph.style else ""
|
|
|
|
if "heading" in style_name:
|
|
# Extract heading level from style name
|
|
import re
|
|
level_match = re.search(r'(\d+)', style_name)
|
|
level = int(level_match.group(1)) if level_match else 1
|
|
return f"{'#' * level} {text}"
|
|
elif "title" in style_name:
|
|
return f"# {text}"
|
|
elif "subtitle" in style_name:
|
|
return f"## {text}"
|
|
elif style_name in ["list paragraph", "list"]:
|
|
return f"- {text}"
|
|
elif "quote" in style_name:
|
|
return f"> {text}"
|
|
else:
|
|
return text
|
|
|
|
|
|
def _table_to_markdown(table) -> str:
|
|
"""Convert a Word table to markdown format."""
|
|
markdown_rows = []
|
|
|
|
for i, row in enumerate(table.rows):
|
|
cells = [cell.text.strip().replace('\n', ' ') for cell in row.cells]
|
|
markdown_row = "| " + " | ".join(cells) + " |"
|
|
markdown_rows.append(markdown_row)
|
|
|
|
# Add header separator after first row
|
|
if i == 0:
|
|
separator = "| " + " | ".join(["---"] * len(cells)) + " |"
|
|
markdown_rows.append(separator)
|
|
|
|
return "\n".join(markdown_rows)
|
|
|
|
|
|
def _html_to_markdown(html_content: str, preserve_structure: bool) -> str:
|
|
"""Convert HTML content to markdown format."""
|
|
import re
|
|
|
|
# Basic HTML to Markdown conversions
|
|
conversions = [
|
|
(r'<h1[^>]*>(.*?)</h1>', r'# \1'),
|
|
(r'<h2[^>]*>(.*?)</h2>', r'## \1'),
|
|
(r'<h3[^>]*>(.*?)</h3>', r'### \1'),
|
|
(r'<h4[^>]*>(.*?)</h4>', r'#### \1'),
|
|
(r'<h5[^>]*>(.*?)</h5>', r'##### \1'),
|
|
(r'<h6[^>]*>(.*?)</h6>', r'###### \1'),
|
|
(r'<strong[^>]*>(.*?)</strong>', r'**\1**'),
|
|
(r'<b[^>]*>(.*?)</b>', r'**\1**'),
|
|
(r'<em[^>]*>(.*?)</em>', r'*\1*'),
|
|
(r'<i[^>]*>(.*?)</i>', r'*\1*'),
|
|
(r'<code[^>]*>(.*?)</code>', r'`\1`'),
|
|
(r'<a[^>]*href="([^"]*)"[^>]*>(.*?)</a>', r'[\2](\1)'),
|
|
(r'<img[^>]*src="([^"]*)"[^>]*/?>', r''),
|
|
(r'<p[^>]*>(.*?)</p>', r'\1\n'),
|
|
(r'<br[^>]*/?>', r'\n'),
|
|
(r'<li[^>]*>(.*?)</li>', r'- \1'),
|
|
(r'<ul[^>]*>(.*?)</ul>', r'\1'),
|
|
(r'<ol[^>]*>(.*?)</ol>', r'\1'),
|
|
(r'<blockquote[^>]*>(.*?)</blockquote>', r'> \1'),
|
|
]
|
|
|
|
markdown = html_content
|
|
for pattern, replacement in conversions:
|
|
markdown = re.sub(pattern, replacement, markdown, flags=re.DOTALL | re.IGNORECASE)
|
|
|
|
# Clean up extra whitespace
|
|
markdown = re.sub(r'\n\s*\n\s*\n', '\n\n', markdown)
|
|
markdown = re.sub(r'^\s+|\s+$', '', markdown, flags=re.MULTILINE)
|
|
|
|
return markdown
|
|
|
|
|
|
def _chunk_markdown(content: str, chunk_size: int) -> list[dict[str, Any]]:
|
|
"""Split markdown content into chunks while preserving structure."""
|
|
chunks = []
|
|
lines = content.split('\n')
|
|
current_chunk = []
|
|
current_size = 0
|
|
chunk_num = 1
|
|
|
|
for line in lines:
|
|
line_size = len(line) + 1 # +1 for newline
|
|
|
|
# If adding this line would exceed chunk size and we have content
|
|
if current_size + line_size > chunk_size and current_chunk:
|
|
chunks.append({
|
|
"chunk_number": chunk_num,
|
|
"content": '\n'.join(current_chunk),
|
|
"character_count": current_size,
|
|
"line_count": len(current_chunk)
|
|
})
|
|
current_chunk = []
|
|
current_size = 0
|
|
chunk_num += 1
|
|
|
|
current_chunk.append(line)
|
|
current_size += line_size
|
|
|
|
# Add final chunk if there's remaining content
|
|
if current_chunk:
|
|
chunks.append({
|
|
"chunk_number": chunk_num,
|
|
"content": '\n'.join(current_chunk),
|
|
"character_count": current_size,
|
|
"line_count": len(current_chunk)
|
|
})
|
|
|
|
return chunks
|
|
|
|
|
|
def _extract_markdown_structure(content: str) -> dict[str, Any]:
|
|
"""Extract structure information from markdown content."""
|
|
import re
|
|
|
|
structure = {
|
|
"headings": [],
|
|
"lists": 0,
|
|
"links": 0,
|
|
"images": 0,
|
|
"code_blocks": 0,
|
|
"tables": 0,
|
|
"line_count": len(content.split('\n'))
|
|
}
|
|
|
|
lines = content.split('\n')
|
|
for i, line in enumerate(lines):
|
|
# Find headings
|
|
heading_match = re.match(r'^(#{1,6})\s+(.+)', line)
|
|
if heading_match:
|
|
level = len(heading_match.group(1))
|
|
text = heading_match.group(2).strip()
|
|
structure["headings"].append({
|
|
"level": level,
|
|
"text": text,
|
|
"line_number": i + 1
|
|
})
|
|
|
|
# Count other elements
|
|
if re.match(r'^[-*+]\s+', line):
|
|
structure["lists"] += 1
|
|
|
|
structure["links"] += len(re.findall(r'\[([^\]]+)\]\([^)]+\)', line))
|
|
structure["images"] += len(re.findall(r'!\[([^\]]*)\]\([^)]+\)', line))
|
|
|
|
if line.strip().startswith('```'):
|
|
structure["code_blocks"] += 1
|
|
|
|
if '|' in line and line.count('|') >= 2:
|
|
structure["tables"] += 1
|
|
|
|
return structure
|
|
|
|
|
|
async def _get_ultra_fast_summary(file_path: str) -> dict[str, Any]:
|
|
"""Ultra-fast summary that extracts minimal data to prevent MCP token limits."""
|
|
try:
|
|
import docx
|
|
doc = docx.Document(file_path)
|
|
|
|
# Extract only the first few paragraphs and major headings
|
|
content_parts = []
|
|
heading_count = 0
|
|
paragraph_count = 0
|
|
max_content_length = 2000 # Very short limit
|
|
current_length = 0
|
|
|
|
# Get basic structure info quickly
|
|
total_paragraphs = len(doc.paragraphs)
|
|
total_tables = len(doc.tables)
|
|
|
|
# Extract bookmarks (chapter markers)
|
|
bookmarks = []
|
|
try:
|
|
# Access document's bookmarks through the XML
|
|
for bookmark in doc.element.xpath('//w:bookmarkStart', namespaces={'w': 'http://schemas.openxmlformats.org/wordprocessingml/2006/main'}):
|
|
bookmark_name = bookmark.get('{http://schemas.openxmlformats.org/wordprocessingml/2006/main}name')
|
|
if bookmark_name and not bookmark_name.startswith('_'): # Skip system bookmarks
|
|
bookmarks.append(bookmark_name)
|
|
except Exception:
|
|
pass # Bookmarks extraction failed, continue without
|
|
|
|
# Extract just a few key headings and the start of content
|
|
for para in doc.paragraphs[:50]: # Only check first 50 paragraphs
|
|
text = para.text.strip()
|
|
if not text:
|
|
continue
|
|
|
|
# Check if it's a heading (simple heuristic)
|
|
is_heading = (para.style and "heading" in para.style.name.lower()) or len(text) < 100
|
|
|
|
if is_heading and heading_count < 10: # Max 10 headings
|
|
content_parts.append(f"# {text}")
|
|
heading_count += 1
|
|
current_length += len(text) + 3
|
|
elif paragraph_count < 5 and current_length < max_content_length: # Max 5 paragraphs
|
|
content_parts.append(text)
|
|
paragraph_count += 1
|
|
current_length += len(text)
|
|
|
|
if current_length > max_content_length:
|
|
break
|
|
|
|
# Create very basic summary
|
|
summary_content = "\n\n".join(content_parts)
|
|
|
|
return {
|
|
"content": summary_content,
|
|
"method_used": "ultra-fast-summary",
|
|
"table_of_contents": {
|
|
"note": "Use full document processing for detailed TOC",
|
|
"basic_info": f"Document has ~{total_paragraphs} paragraphs, {total_tables} tables, {heading_count} headings found in first scan",
|
|
"bookmarks": bookmarks[:20] if bookmarks else [], # Limit to first 20 bookmarks
|
|
"bookmark_count": len(bookmarks),
|
|
"bookmark_note": "Bookmarks often indicate chapter starts. Use these as navigation hints for page_range extraction."
|
|
}
|
|
}
|
|
|
|
except Exception as e:
|
|
return {
|
|
"content": f"Error creating summary: {str(e)}",
|
|
"method_used": "error-fallback",
|
|
"table_of_contents": {"note": "Summary generation failed"}
|
|
}
|
|
|
|
|
|
def _smart_truncate_content(content: str, max_chars: int) -> str:
|
|
"""Intelligently truncate content while preserving structure and readability."""
|
|
if len(content) <= max_chars:
|
|
return content
|
|
|
|
lines = content.split('\n')
|
|
truncated_lines = []
|
|
current_length = 0
|
|
|
|
# Try to preserve structure by stopping at a natural break point
|
|
for line in lines:
|
|
line_length = len(line) + 1 # +1 for newline
|
|
|
|
# If adding this line would exceed limit
|
|
if current_length + line_length > max_chars:
|
|
# Try to find a good stopping point
|
|
if truncated_lines:
|
|
# Check if we're in the middle of a section
|
|
last_lines = '\n'.join(truncated_lines[-3:]) if len(truncated_lines) >= 3 else '\n'.join(truncated_lines)
|
|
|
|
# If we stopped mid-paragraph, remove incomplete paragraph
|
|
if not (line.strip() == '' or line.startswith('#') or line.startswith('|')):
|
|
# Remove lines until we hit a natural break
|
|
while truncated_lines and not (
|
|
truncated_lines[-1].strip() == '' or
|
|
truncated_lines[-1].startswith('#') or
|
|
truncated_lines[-1].startswith('|') or
|
|
truncated_lines[-1].startswith('-') or
|
|
truncated_lines[-1].startswith('*')
|
|
):
|
|
truncated_lines.pop()
|
|
break
|
|
|
|
truncated_lines.append(line)
|
|
current_length += line_length
|
|
|
|
# Add truncation notice
|
|
result = '\n'.join(truncated_lines)
|
|
result += f"\n\n---\n**[CONTENT TRUNCATED]**\nShowing {len(result):,} of {len(content):,} characters.\nUse smaller page ranges (e.g., 3-5 pages) for full content without truncation.\n---"
|
|
|
|
return result
|
|
|
|
|
|
def _estimate_section_length(heading_level: int) -> int:
|
|
"""Estimate how many pages a section might span based on heading level."""
|
|
# Higher level headings (H1) tend to have longer sections
|
|
if heading_level == 1: # Major chapters
|
|
return 8
|
|
elif heading_level == 2: # Major sections
|
|
return 4
|
|
elif heading_level == 3: # Subsections
|
|
return 2
|
|
else: # Minor headings
|
|
return 1
|
|
|
|
|
|
def _optimize_toc_page_ranges(toc_entries: list) -> dict[str, Any]:
|
|
"""Optimize table of contents page ranges based on actual heading positions."""
|
|
optimized_toc = {
|
|
"sections": [],
|
|
"total_sections": len(toc_entries),
|
|
"suggested_chunking": []
|
|
}
|
|
|
|
for i, entry in enumerate(toc_entries):
|
|
# Calculate actual end page based on next heading or document end
|
|
if i + 1 < len(toc_entries):
|
|
next_page = toc_entries[i + 1]["page"]
|
|
actual_end_page = max(entry["page"], next_page - 1)
|
|
else:
|
|
# Last section - use estimated length
|
|
actual_end_page = entry["page"] + _estimate_section_length(entry["level"])
|
|
|
|
optimized_entry = {
|
|
"level": entry["level"],
|
|
"title": entry["title"],
|
|
"start_page": entry["page"],
|
|
"estimated_end_page": actual_end_page,
|
|
"suggested_page_range": f"{entry['page']}-{actual_end_page}",
|
|
"section_type": _classify_section_type(entry["level"], entry["title"])
|
|
}
|
|
optimized_toc["sections"].append(optimized_entry)
|
|
|
|
# Generate chunking suggestions
|
|
optimized_toc["suggested_chunking"] = _generate_chunking_suggestions(optimized_toc["sections"])
|
|
|
|
return optimized_toc
|
|
|
|
|
|
def _classify_section_type(level: int, title: str) -> str:
|
|
"""Classify section type based on level and title patterns."""
|
|
title_lower = title.lower()
|
|
|
|
if level == 1:
|
|
if any(word in title_lower for word in ["chapter", "part", "section"]):
|
|
return "chapter"
|
|
elif any(word in title_lower for word in ["introduction", "conclusion", "summary"]):
|
|
return "special_section"
|
|
else:
|
|
return "major_section"
|
|
elif level == 2:
|
|
return "section"
|
|
elif level == 3:
|
|
return "subsection"
|
|
else:
|
|
return "minor_heading"
|
|
|
|
|
|
def _generate_chunking_suggestions(sections: list) -> list[dict[str, Any]]:
|
|
"""Generate smart chunking suggestions based on document structure."""
|
|
suggestions = []
|
|
current_chunk_pages = 0
|
|
chunk_start = 1
|
|
chunk_sections = []
|
|
|
|
for section in sections:
|
|
section_pages = section["estimated_end_page"] - section["start_page"] + 1
|
|
|
|
# If adding this section would make chunk too large, finalize current chunk
|
|
# Use smaller chunks (8 pages) to prevent MCP token limit issues
|
|
if current_chunk_pages + section_pages > 8 and chunk_sections:
|
|
suggestions.append({
|
|
"chunk_number": len(suggestions) + 1,
|
|
"page_range": f"{chunk_start}-{chunk_sections[-1]['estimated_end_page']}",
|
|
"sections_included": [s["title"] for s in chunk_sections],
|
|
"estimated_pages": current_chunk_pages,
|
|
"description": f"Chunk {len(suggestions) + 1}: {chunk_sections[0]['title']}" +
|
|
(f" + {len(chunk_sections)-1} more sections" if len(chunk_sections) > 1 else "")
|
|
})
|
|
|
|
# Start new chunk
|
|
chunk_start = section["start_page"]
|
|
current_chunk_pages = section_pages
|
|
chunk_sections = [section]
|
|
else:
|
|
# Add to current chunk
|
|
current_chunk_pages += section_pages
|
|
chunk_sections.append(section)
|
|
|
|
# Add final chunk if any sections remain
|
|
if chunk_sections:
|
|
suggestions.append({
|
|
"chunk_number": len(suggestions) + 1,
|
|
"page_range": f"{chunk_start}-{chunk_sections[-1]['estimated_end_page']}",
|
|
"sections_included": [s["title"] for s in chunk_sections],
|
|
"estimated_pages": current_chunk_pages,
|
|
"description": f"Chunk {len(suggestions) + 1}: {chunk_sections[0]['title']}" +
|
|
(f" + {len(chunk_sections)-1} more sections" if len(chunk_sections) > 1 else "")
|
|
})
|
|
|
|
return suggestions
|
|
|
|
|
|
def _has_page_break(paragraph) -> bool:
|
|
"""Check if a paragraph contains a page break."""
|
|
try:
|
|
# Check for explicit page breaks in paragraph runs
|
|
for run in paragraph.runs:
|
|
if run._r.find('.//{http://schemas.openxmlformats.org/wordprocessingml/2006/main}br') is not None:
|
|
br_elem = run._r.find('.//{http://schemas.openxmlformats.org/wordprocessingml/2006/main}br')
|
|
if br_elem is not None and br_elem.get('{http://schemas.openxmlformats.org/wordprocessingml/2006/main}type') == 'page':
|
|
return True
|
|
return False
|
|
except Exception:
|
|
return False
|
|
|
|
|
|
def _parse_page_range(page_range: str) -> list[int]:
|
|
"""Parse page range string into list of page numbers.
|
|
|
|
Examples:
|
|
"1-5" -> [1, 2, 3, 4, 5]
|
|
"1,3,5" -> [1, 3, 5]
|
|
"1-3,5,7-9" -> [1, 2, 3, 5, 7, 8, 9]
|
|
"""
|
|
pages = set()
|
|
|
|
for part in page_range.split(','):
|
|
part = part.strip()
|
|
if '-' in part:
|
|
# Handle range like "1-5"
|
|
start, end = part.split('-', 1)
|
|
try:
|
|
start_num = int(start.strip())
|
|
end_num = int(end.strip())
|
|
pages.update(range(start_num, end_num + 1))
|
|
except ValueError:
|
|
continue
|
|
else:
|
|
# Handle single page like "3"
|
|
try:
|
|
pages.add(int(part))
|
|
except ValueError:
|
|
continue
|
|
|
|
return sorted(list(pages))
|
|
|
|
|
|
async def _analyze_document_size(file_path: str, extension: str) -> dict[str, Any]:
|
|
"""Analyze document to estimate size and complexity."""
|
|
analysis = {
|
|
"estimated_pages": 1,
|
|
"file_size_mb": 0,
|
|
"complexity": "simple",
|
|
"estimated_content_size": "small"
|
|
}
|
|
|
|
try:
|
|
# Get file size
|
|
from pathlib import Path
|
|
file_size = Path(file_path).stat().st_size
|
|
analysis["file_size_mb"] = round(file_size / (1024 * 1024), 2)
|
|
|
|
if extension == ".docx":
|
|
try:
|
|
import docx
|
|
doc = docx.Document(file_path)
|
|
|
|
# Estimate pages based on content
|
|
paragraph_count = len(doc.paragraphs)
|
|
table_count = len(doc.tables)
|
|
|
|
# Rough estimation: ~40 paragraphs per page
|
|
estimated_pages = max(1, paragraph_count // 40)
|
|
analysis["estimated_pages"] = estimated_pages
|
|
|
|
# Determine complexity
|
|
if table_count > 10 or paragraph_count > 500:
|
|
analysis["complexity"] = "complex"
|
|
elif table_count > 5 or paragraph_count > 200:
|
|
analysis["complexity"] = "moderate"
|
|
|
|
# Estimate content size
|
|
if estimated_pages > 20:
|
|
analysis["estimated_content_size"] = "very_large"
|
|
elif estimated_pages > 10:
|
|
analysis["estimated_content_size"] = "large"
|
|
elif estimated_pages > 5:
|
|
analysis["estimated_content_size"] = "medium"
|
|
|
|
except Exception:
|
|
# Fallback to file size estimation
|
|
if file_size > 5 * 1024 * 1024: # 5MB
|
|
analysis["estimated_pages"] = 50
|
|
analysis["estimated_content_size"] = "very_large"
|
|
elif file_size > 1 * 1024 * 1024: # 1MB
|
|
analysis["estimated_pages"] = 20
|
|
analysis["estimated_content_size"] = "large"
|
|
elif file_size > 500 * 1024: # 500KB
|
|
analysis["estimated_pages"] = 10
|
|
analysis["estimated_content_size"] = "medium"
|
|
|
|
except Exception:
|
|
pass
|
|
|
|
return analysis
|
|
|
|
|
|
def _get_processing_recommendation(
|
|
doc_analysis: dict[str, Any],
|
|
page_range: str,
|
|
summary_only: bool
|
|
) -> dict[str, Any]:
|
|
"""Generate intelligent processing recommendations based on document analysis."""
|
|
|
|
estimated_pages = doc_analysis["estimated_pages"]
|
|
content_size = doc_analysis["estimated_content_size"]
|
|
|
|
recommendation = {
|
|
"status": "optimal",
|
|
"message": "",
|
|
"suggested_workflow": [],
|
|
"warnings": []
|
|
}
|
|
|
|
# Large document recommendations
|
|
if content_size in ["large", "very_large"] and not page_range and not summary_only:
|
|
recommendation["status"] = "suboptimal"
|
|
recommendation["message"] = (
|
|
f"⚠️ Large document detected ({estimated_pages} estimated pages). "
|
|
"Consider using recommended workflow for better performance."
|
|
)
|
|
recommendation["suggested_workflow"] = [
|
|
"1. First: Call with summary_only=true to get document overview and TOC",
|
|
"2. Then: Use page_range to process specific sections (e.g., '1-5', '6-10', '15-20')",
|
|
"3. Recommended: Use 3-8 page chunks to stay under 25k token MCP limit",
|
|
"4. The tool auto-truncates if content is too large, but smaller ranges work better"
|
|
]
|
|
recommendation["warnings"] = [
|
|
"Page ranges >8 pages may hit 25k token response limit and get truncated",
|
|
"Use smaller page ranges (3-5 pages) for dense content documents",
|
|
"Auto-truncation preserves structure but loses content completeness"
|
|
]
|
|
|
|
# Medium document recommendations
|
|
elif content_size == "medium" and not page_range and not summary_only:
|
|
recommendation["status"] = "caution"
|
|
recommendation["message"] = (
|
|
f"Medium document detected ({estimated_pages} estimated pages). "
|
|
"Consider summary_only=true first if you encounter response size issues."
|
|
)
|
|
recommendation["suggested_workflow"] = [
|
|
"Option 1: Try full processing (current approach)",
|
|
"Option 2: Use summary_only=true first, then page_range if needed"
|
|
]
|
|
|
|
# Optimal usage patterns
|
|
elif summary_only:
|
|
recommendation["message"] = "✅ Excellent! Using summary mode for initial document analysis."
|
|
recommendation["suggested_workflow"] = [
|
|
"After reviewing summary, use page_range to extract specific sections of interest"
|
|
]
|
|
|
|
elif page_range and content_size in ["large", "very_large"]:
|
|
recommendation["message"] = "✅ Perfect! Using page-range processing for efficient extraction."
|
|
|
|
elif content_size == "small":
|
|
recommendation["message"] = "✅ Small document - full processing is optimal."
|
|
|
|
return recommendation
|
|
|
|
|
|
def main():
|
|
"""Main entry point for the MCP server."""
|
|
import sys
|
|
|
|
if len(sys.argv) > 1 and sys.argv[1] == "--version":
|
|
from . import __version__
|
|
print(f"MCP Office Tools v{__version__}")
|
|
return
|
|
|
|
# Run the FastMCP server
|
|
app.run()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|