Refactor: Extract processing logic into utility modules
Complete architecture cleanup - eliminated duplicate server files: - Deleted server_monolithic.py (2249 lines) - Deleted server_legacy.py (2209 lines) New utility modules created: - utils/word_processing.py - Word extraction/conversion (preserves page range fixes) - utils/excel_processing.py - Excel extraction - utils/powerpoint_processing.py - PowerPoint extraction - utils/processing.py - Universal helpers (parse_page_range, health checks, etc.) Updated mixins to import from utils instead of server_monolithic. Entry point remains server.py (48 lines) using mixin architecture. All 53 tests pass. Coverage improved from 11% to 22% by removing duplicate code.
This commit is contained in:
parent
8249afb763
commit
af6aadf559
@ -293,7 +293,7 @@ class UniversalMixin(MCPMixin):
|
||||
async def _extract_text_by_category(self, file_path: str, extension: str, category: str, preserve_formatting: bool, method: str) -> dict[str, Any]:
|
||||
"""Extract text based on document category."""
|
||||
# Import the appropriate extraction function
|
||||
from ..server_monolithic import _extract_word_text, _extract_excel_text, _extract_powerpoint_text
|
||||
from ..utils import _extract_word_text, _extract_excel_text, _extract_powerpoint_text
|
||||
|
||||
if category == "word":
|
||||
return await _extract_word_text(file_path, extension, preserve_formatting, method)
|
||||
@ -306,7 +306,7 @@ class UniversalMixin(MCPMixin):
|
||||
|
||||
async def _extract_images_by_category(self, file_path: str, extension: str, category: str, output_format: str, min_width: int, min_height: int) -> list[dict[str, Any]]:
|
||||
"""Extract images based on document category."""
|
||||
from ..server_monolithic import _extract_word_images, _extract_excel_images, _extract_powerpoint_images
|
||||
from ..utils import _extract_word_images, _extract_excel_images, _extract_powerpoint_images
|
||||
|
||||
if category == "word":
|
||||
return await _extract_word_images(file_path, extension, output_format, min_width, min_height)
|
||||
@ -319,7 +319,7 @@ class UniversalMixin(MCPMixin):
|
||||
|
||||
async def _extract_metadata_by_category(self, file_path: str, extension: str, category: str) -> dict[str, Any]:
|
||||
"""Extract metadata based on document category."""
|
||||
from ..server_monolithic import _extract_word_metadata, _extract_excel_metadata, _extract_powerpoint_metadata, _extract_basic_metadata
|
||||
from ..utils import _extract_word_metadata, _extract_excel_metadata, _extract_powerpoint_metadata, _extract_basic_metadata
|
||||
|
||||
# Get basic metadata first
|
||||
metadata = await _extract_basic_metadata(file_path, extension, category)
|
||||
@ -339,5 +339,5 @@ class UniversalMixin(MCPMixin):
|
||||
|
||||
async def _extract_basic_metadata(self, file_path: str, extension: str, category: str) -> dict[str, Any]:
|
||||
"""Extract basic metadata common to all documents."""
|
||||
from ..server_monolithic import _extract_basic_metadata
|
||||
from ..utils import _extract_basic_metadata
|
||||
return await _extract_basic_metadata(file_path, extension, category)
|
||||
@ -225,17 +225,17 @@ class WordMixin(MCPMixin):
|
||||
# Helper methods - import from monolithic server
|
||||
async def _analyze_document_size(self, file_path: str, extension: str) -> dict[str, Any]:
|
||||
"""Analyze document size for processing recommendations."""
|
||||
from ..server_monolithic import _analyze_document_size
|
||||
from ..utils import _analyze_document_size
|
||||
return await _analyze_document_size(file_path, extension)
|
||||
|
||||
def _get_processing_recommendation(self, doc_analysis: dict[str, Any], page_range: str, summary_only: bool) -> dict[str, Any]:
|
||||
"""Get processing recommendations based on document analysis."""
|
||||
from ..server_monolithic import _get_processing_recommendation
|
||||
from ..utils import _get_processing_recommendation
|
||||
return _get_processing_recommendation(doc_analysis, page_range, summary_only)
|
||||
|
||||
def _parse_page_range(self, page_range: str) -> list[int]:
|
||||
"""Parse page range string into list of page numbers."""
|
||||
from ..server_monolithic import _parse_page_range
|
||||
from ..utils import _parse_page_range
|
||||
return _parse_page_range(page_range)
|
||||
|
||||
async def _convert_docx_to_markdown(
|
||||
@ -244,7 +244,7 @@ class WordMixin(MCPMixin):
|
||||
bookmark_name: str = "", chapter_name: str = ""
|
||||
) -> dict[str, Any]:
|
||||
"""Convert .docx to markdown."""
|
||||
from ..server_monolithic import _convert_docx_to_markdown
|
||||
from ..utils import _convert_docx_to_markdown
|
||||
return await _convert_docx_to_markdown(
|
||||
file_path, include_images, image_mode, max_image_size,
|
||||
preserve_structure, page_numbers, summary_only, output_dir, bookmark_name, chapter_name
|
||||
@ -255,7 +255,7 @@ class WordMixin(MCPMixin):
|
||||
preserve_structure: bool, page_numbers: list[int], summary_only: bool, output_dir: str
|
||||
) -> dict[str, Any]:
|
||||
"""Convert legacy .doc to markdown."""
|
||||
from ..server_monolithic import _convert_doc_to_markdown
|
||||
from ..utils import _convert_doc_to_markdown
|
||||
return await _convert_doc_to_markdown(
|
||||
file_path, include_images, image_mode, max_image_size,
|
||||
preserve_structure, page_numbers, summary_only, output_dir
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@ -27,6 +27,48 @@ from .decorators import (
|
||||
handle_office_errors
|
||||
)
|
||||
|
||||
from .processing import (
|
||||
TEMP_DIR,
|
||||
DEBUG,
|
||||
_extract_basic_metadata,
|
||||
_calculate_health_score,
|
||||
_get_health_recommendations,
|
||||
_smart_truncate_content,
|
||||
_parse_page_range,
|
||||
_get_processing_recommendation,
|
||||
)
|
||||
|
||||
from .word_processing import (
|
||||
_extract_word_text,
|
||||
_extract_word_images,
|
||||
_extract_word_metadata,
|
||||
_convert_docx_to_markdown,
|
||||
_convert_docx_with_python_docx,
|
||||
_convert_doc_to_markdown,
|
||||
_get_ultra_fast_summary,
|
||||
_find_bookmark_content_range,
|
||||
_find_chapter_content_range,
|
||||
_get_available_headings,
|
||||
_has_page_break,
|
||||
_analyze_document_size,
|
||||
_paragraph_to_markdown,
|
||||
_table_to_markdown,
|
||||
_html_to_markdown,
|
||||
_extract_markdown_structure,
|
||||
)
|
||||
|
||||
from .excel_processing import (
|
||||
_extract_excel_text,
|
||||
_extract_excel_images,
|
||||
_extract_excel_metadata,
|
||||
)
|
||||
|
||||
from .powerpoint_processing import (
|
||||
_extract_powerpoint_text,
|
||||
_extract_powerpoint_images,
|
||||
_extract_powerpoint_metadata,
|
||||
)
|
||||
|
||||
__all__ = [
|
||||
# Validation
|
||||
"OfficeFileError",
|
||||
|
||||
203
src/mcp_office_tools/utils/excel_processing.py
Normal file
203
src/mcp_office_tools/utils/excel_processing.py
Normal file
@ -0,0 +1,203 @@
|
||||
"""Excel document processing utilities.
|
||||
|
||||
This module provides helper functions for extracting text, images, and metadata
|
||||
from Excel documents (.xlsx, .xls, .xlsm, .csv) with intelligent method selection
|
||||
and fallback support.
|
||||
"""
|
||||
|
||||
from typing import Any
|
||||
|
||||
from . import OfficeFileError
|
||||
|
||||
|
||||
async def _extract_excel_text(file_path: str, extension: str, preserve_formatting: bool, method: str) -> dict[str, Any]:
|
||||
"""Extract text from Excel documents."""
|
||||
methods_tried = []
|
||||
|
||||
if extension == ".csv":
|
||||
# CSV handling
|
||||
import pandas as pd
|
||||
try:
|
||||
df = pd.read_csv(file_path)
|
||||
text = df.to_string()
|
||||
return {
|
||||
"text": text,
|
||||
"method_used": "pandas",
|
||||
"methods_tried": ["pandas"],
|
||||
"formatted_sections": [{"type": "table", "data": df.to_dict()}] if preserve_formatting else []
|
||||
}
|
||||
except Exception as e:
|
||||
raise OfficeFileError(f"CSV processing failed: {str(e)}")
|
||||
|
||||
# Excel file handling
|
||||
text = ""
|
||||
formatted_sections = []
|
||||
method_used = None
|
||||
|
||||
method_order = ["openpyxl", "pandas", "xlrd"] if extension == ".xlsx" else ["xlrd", "pandas", "openpyxl"]
|
||||
|
||||
for method_name in method_order:
|
||||
try:
|
||||
methods_tried.append(method_name)
|
||||
|
||||
if method_name == "openpyxl" and extension in [".xlsx", ".xlsm"]:
|
||||
import openpyxl
|
||||
wb = openpyxl.load_workbook(file_path, data_only=True)
|
||||
|
||||
text_parts = []
|
||||
for sheet_name in wb.sheetnames:
|
||||
ws = wb[sheet_name]
|
||||
text_parts.append(f"Sheet: {sheet_name}")
|
||||
|
||||
for row in ws.iter_rows(values_only=True):
|
||||
row_text = "\t".join(str(cell) if cell is not None else "" for cell in row)
|
||||
if row_text.strip():
|
||||
text_parts.append(row_text)
|
||||
|
||||
if preserve_formatting:
|
||||
formatted_sections.append({
|
||||
"type": "worksheet",
|
||||
"name": sheet_name,
|
||||
"data": [[str(cell.value) if cell.value is not None else "" for cell in row] for row in ws.iter_rows()]
|
||||
})
|
||||
|
||||
text = "\n".join(text_parts)
|
||||
method_used = "openpyxl"
|
||||
break
|
||||
|
||||
elif method_name == "pandas":
|
||||
import pandas as pd
|
||||
|
||||
if extension in [".xlsx", ".xlsm"]:
|
||||
dfs = pd.read_excel(file_path, sheet_name=None)
|
||||
else: # .xls
|
||||
dfs = pd.read_excel(file_path, sheet_name=None, engine='xlrd')
|
||||
|
||||
text_parts = []
|
||||
for sheet_name, df in dfs.items():
|
||||
text_parts.append(f"Sheet: {sheet_name}")
|
||||
text_parts.append(df.to_string())
|
||||
|
||||
if preserve_formatting:
|
||||
formatted_sections.append({
|
||||
"type": "dataframe",
|
||||
"name": sheet_name,
|
||||
"data": df.to_dict()
|
||||
})
|
||||
|
||||
text = "\n\n".join(text_parts)
|
||||
method_used = "pandas"
|
||||
break
|
||||
|
||||
elif method_name == "xlrd" and extension == ".xls":
|
||||
import xlrd
|
||||
wb = xlrd.open_workbook(file_path)
|
||||
|
||||
text_parts = []
|
||||
for sheet in wb.sheets():
|
||||
text_parts.append(f"Sheet: {sheet.name}")
|
||||
|
||||
for row_idx in range(sheet.nrows):
|
||||
row = sheet.row_values(row_idx)
|
||||
row_text = "\t".join(str(cell) for cell in row)
|
||||
text_parts.append(row_text)
|
||||
|
||||
text = "\n".join(text_parts)
|
||||
method_used = "xlrd"
|
||||
break
|
||||
|
||||
except ImportError:
|
||||
continue
|
||||
except Exception:
|
||||
continue
|
||||
|
||||
if not method_used:
|
||||
raise OfficeFileError(f"Failed to extract text using methods: {', '.join(methods_tried)}")
|
||||
|
||||
return {
|
||||
"text": text,
|
||||
"method_used": method_used,
|
||||
"methods_tried": methods_tried,
|
||||
"formatted_sections": formatted_sections
|
||||
}
|
||||
|
||||
|
||||
async def _extract_excel_images(file_path: str, extension: str, output_format: str, min_width: int, min_height: int) -> list[dict[str, Any]]:
|
||||
"""Extract images from Excel documents."""
|
||||
import io
|
||||
import os
|
||||
import tempfile
|
||||
import zipfile
|
||||
|
||||
from PIL import Image
|
||||
|
||||
images = []
|
||||
TEMP_DIR = os.environ.get("OFFICE_TEMP_DIR", tempfile.gettempdir())
|
||||
|
||||
if extension in [".xlsx", ".xlsm"]:
|
||||
try:
|
||||
with zipfile.ZipFile(file_path, 'r') as zip_file:
|
||||
# Look for images in media folder
|
||||
image_files = [f for f in zip_file.namelist() if f.startswith('xl/media/')]
|
||||
|
||||
for i, img_path in enumerate(image_files):
|
||||
try:
|
||||
img_data = zip_file.read(img_path)
|
||||
img = Image.open(io.BytesIO(img_data))
|
||||
|
||||
# Size filtering
|
||||
if img.width >= min_width and img.height >= min_height:
|
||||
# Save to temp file
|
||||
temp_path = os.path.join(TEMP_DIR, f"excel_image_{i}.{output_format}")
|
||||
img.save(temp_path, format=output_format.upper())
|
||||
|
||||
images.append({
|
||||
"index": i,
|
||||
"filename": os.path.basename(img_path),
|
||||
"path": temp_path,
|
||||
"width": img.width,
|
||||
"height": img.height,
|
||||
"format": img.format,
|
||||
"size_bytes": len(img_data)
|
||||
})
|
||||
except Exception:
|
||||
continue
|
||||
|
||||
except Exception as e:
|
||||
raise OfficeFileError(f"Excel image extraction failed: {str(e)}")
|
||||
|
||||
return images
|
||||
|
||||
|
||||
async def _extract_excel_metadata(file_path: str, extension: str) -> dict[str, Any]:
|
||||
"""Extract Excel-specific metadata."""
|
||||
metadata = {"type": "excel", "extension": extension}
|
||||
|
||||
if extension in [".xlsx", ".xlsm"]:
|
||||
try:
|
||||
import openpyxl
|
||||
wb = openpyxl.load_workbook(file_path)
|
||||
|
||||
props = wb.properties
|
||||
metadata.update({
|
||||
"title": props.title,
|
||||
"creator": props.creator,
|
||||
"subject": props.subject,
|
||||
"description": props.description,
|
||||
"keywords": props.keywords,
|
||||
"created": str(props.created) if props.created else None,
|
||||
"modified": str(props.modified) if props.modified else None
|
||||
})
|
||||
|
||||
# Workbook structure
|
||||
metadata.update({
|
||||
"worksheet_count": len(wb.worksheets),
|
||||
"worksheet_names": wb.sheetnames,
|
||||
"has_charts": any(len(ws._charts) > 0 for ws in wb.worksheets),
|
||||
"has_images": any(len(ws._images) > 0 for ws in wb.worksheets)
|
||||
})
|
||||
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
return metadata
|
||||
177
src/mcp_office_tools/utils/powerpoint_processing.py
Normal file
177
src/mcp_office_tools/utils/powerpoint_processing.py
Normal file
@ -0,0 +1,177 @@
|
||||
"""PowerPoint document processing utilities.
|
||||
|
||||
This module provides helper functions for extracting text, images, and metadata
|
||||
from PowerPoint documents (.pptx and .ppt files).
|
||||
"""
|
||||
|
||||
import io
|
||||
import os
|
||||
import zipfile
|
||||
from typing import Any
|
||||
|
||||
from PIL import Image
|
||||
|
||||
from . import OfficeFileError
|
||||
|
||||
|
||||
async def _extract_powerpoint_text(
|
||||
file_path: str, extension: str, preserve_formatting: bool, method: str
|
||||
) -> dict[str, Any]:
|
||||
"""Extract text from PowerPoint documents."""
|
||||
methods_tried = []
|
||||
|
||||
if extension == ".pptx":
|
||||
try:
|
||||
import pptx
|
||||
|
||||
prs = pptx.Presentation(file_path)
|
||||
|
||||
text_parts = []
|
||||
formatted_sections = []
|
||||
|
||||
for slide_num, slide in enumerate(prs.slides, 1):
|
||||
slide_text_parts = []
|
||||
|
||||
for shape in slide.shapes:
|
||||
if hasattr(shape, "text") and shape.text:
|
||||
slide_text_parts.append(shape.text)
|
||||
|
||||
slide_text = "\n".join(slide_text_parts)
|
||||
text_parts.append(f"Slide {slide_num}:\n{slide_text}")
|
||||
|
||||
if preserve_formatting:
|
||||
formatted_sections.append(
|
||||
{
|
||||
"type": "slide",
|
||||
"number": slide_num,
|
||||
"text": slide_text,
|
||||
"shapes": len(slide.shapes),
|
||||
}
|
||||
)
|
||||
|
||||
text = "\n\n".join(text_parts)
|
||||
|
||||
return {
|
||||
"text": text,
|
||||
"method_used": "python-pptx",
|
||||
"methods_tried": ["python-pptx"],
|
||||
"formatted_sections": formatted_sections,
|
||||
}
|
||||
|
||||
except ImportError:
|
||||
methods_tried.append("python-pptx")
|
||||
except Exception:
|
||||
methods_tried.append("python-pptx")
|
||||
|
||||
# Legacy .ppt handling would require additional libraries
|
||||
if extension == ".ppt":
|
||||
raise OfficeFileError(
|
||||
"Legacy PowerPoint (.ppt) text extraction requires additional setup"
|
||||
)
|
||||
|
||||
raise OfficeFileError(
|
||||
f"Failed to extract text using methods: {', '.join(methods_tried)}"
|
||||
)
|
||||
|
||||
|
||||
async def _extract_powerpoint_images(
|
||||
file_path: str,
|
||||
extension: str,
|
||||
output_format: str,
|
||||
min_width: int,
|
||||
min_height: int,
|
||||
temp_dir: str,
|
||||
) -> list[dict[str, Any]]:
|
||||
"""Extract images from PowerPoint documents."""
|
||||
images = []
|
||||
|
||||
if extension == ".pptx":
|
||||
try:
|
||||
with zipfile.ZipFile(file_path, "r") as zip_file:
|
||||
# Look for images in media folder
|
||||
image_files = [
|
||||
f for f in zip_file.namelist() if f.startswith("ppt/media/")
|
||||
]
|
||||
|
||||
for i, img_path in enumerate(image_files):
|
||||
try:
|
||||
img_data = zip_file.read(img_path)
|
||||
img = Image.open(io.BytesIO(img_data))
|
||||
|
||||
# Size filtering
|
||||
if img.width >= min_width and img.height >= min_height:
|
||||
# Save to temp file
|
||||
temp_path = os.path.join(
|
||||
temp_dir, f"powerpoint_image_{i}.{output_format}"
|
||||
)
|
||||
img.save(temp_path, format=output_format.upper())
|
||||
|
||||
images.append(
|
||||
{
|
||||
"index": i,
|
||||
"filename": os.path.basename(img_path),
|
||||
"path": temp_path,
|
||||
"width": img.width,
|
||||
"height": img.height,
|
||||
"format": img.format,
|
||||
"size_bytes": len(img_data),
|
||||
}
|
||||
)
|
||||
except Exception:
|
||||
continue
|
||||
|
||||
except Exception as e:
|
||||
raise OfficeFileError(f"PowerPoint image extraction failed: {str(e)}")
|
||||
|
||||
return images
|
||||
|
||||
|
||||
async def _extract_powerpoint_metadata(
|
||||
file_path: str, extension: str
|
||||
) -> dict[str, Any]:
|
||||
"""Extract PowerPoint-specific metadata."""
|
||||
metadata = {"type": "powerpoint", "extension": extension}
|
||||
|
||||
if extension == ".pptx":
|
||||
try:
|
||||
import pptx
|
||||
|
||||
prs = pptx.Presentation(file_path)
|
||||
|
||||
core_props = prs.core_properties
|
||||
metadata.update(
|
||||
{
|
||||
"title": core_props.title,
|
||||
"author": core_props.author,
|
||||
"subject": core_props.subject,
|
||||
"keywords": core_props.keywords,
|
||||
"comments": core_props.comments,
|
||||
"created": str(core_props.created) if core_props.created else None,
|
||||
"modified": str(core_props.modified)
|
||||
if core_props.modified
|
||||
else None,
|
||||
}
|
||||
)
|
||||
|
||||
# Presentation structure
|
||||
slide_layouts = set()
|
||||
total_shapes = 0
|
||||
|
||||
for slide in prs.slides:
|
||||
slide_layouts.add(slide.slide_layout.name)
|
||||
total_shapes += len(slide.shapes)
|
||||
|
||||
metadata.update(
|
||||
{
|
||||
"slide_count": len(prs.slides),
|
||||
"slide_layouts": list(slide_layouts),
|
||||
"total_shapes": total_shapes,
|
||||
"slide_width": prs.slide_width,
|
||||
"slide_height": prs.slide_height,
|
||||
}
|
||||
)
|
||||
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
return metadata
|
||||
228
src/mcp_office_tools/utils/processing.py
Normal file
228
src/mcp_office_tools/utils/processing.py
Normal file
@ -0,0 +1,228 @@
|
||||
"""Universal processing helper functions for Office documents.
|
||||
|
||||
This module contains helper functions used across different document processing
|
||||
operations including metadata extraction, health scoring, content truncation,
|
||||
and page range parsing.
|
||||
"""
|
||||
|
||||
import os
|
||||
import tempfile
|
||||
from typing import Any
|
||||
|
||||
# Configuration
|
||||
TEMP_DIR = os.environ.get("OFFICE_TEMP_DIR", tempfile.gettempdir())
|
||||
DEBUG = os.environ.get("DEBUG", "false").lower() == "true"
|
||||
|
||||
|
||||
async def _extract_basic_metadata(file_path: str, extension: str, category: str) -> dict[str, Any]:
|
||||
"""Extract basic metadata from Office documents."""
|
||||
metadata = {"category": category, "extension": extension}
|
||||
|
||||
try:
|
||||
if extension in [".docx", ".xlsx", ".pptx"] and category in ["word", "excel", "powerpoint"]:
|
||||
import zipfile
|
||||
|
||||
with zipfile.ZipFile(file_path, 'r') as zip_file:
|
||||
# Core properties
|
||||
if 'docProps/core.xml' in zip_file.namelist():
|
||||
zip_file.read('docProps/core.xml').decode('utf-8')
|
||||
metadata["has_core_properties"] = True
|
||||
|
||||
# App properties
|
||||
if 'docProps/app.xml' in zip_file.namelist():
|
||||
zip_file.read('docProps/app.xml').decode('utf-8')
|
||||
metadata["has_app_properties"] = True
|
||||
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
return metadata
|
||||
|
||||
|
||||
def _calculate_health_score(validation: dict[str, Any], format_info: dict[str, Any]) -> int:
|
||||
"""Calculate document health score (1-10)."""
|
||||
score = 10
|
||||
|
||||
# Deduct for validation errors
|
||||
if not validation["is_valid"]:
|
||||
score -= 5
|
||||
|
||||
if validation["errors"]:
|
||||
score -= len(validation["errors"]) * 2
|
||||
|
||||
if validation["warnings"]:
|
||||
score -= len(validation["warnings"])
|
||||
|
||||
# Deduct for problematic characteristics
|
||||
if validation.get("password_protected"):
|
||||
score -= 1
|
||||
|
||||
if format_info.get("is_legacy"):
|
||||
score -= 1
|
||||
|
||||
structure = format_info.get("structure", {})
|
||||
if structure.get("estimated_complexity") == "complex":
|
||||
score -= 1
|
||||
|
||||
return max(1, min(10, score))
|
||||
|
||||
|
||||
def _get_health_recommendations(validation: dict[str, Any], format_info: dict[str, Any]) -> list[str]:
|
||||
"""Get health improvement recommendations."""
|
||||
recommendations = []
|
||||
|
||||
if validation["errors"]:
|
||||
recommendations.append("Fix validation errors before processing")
|
||||
|
||||
if validation.get("password_protected"):
|
||||
recommendations.append("Remove password protection if possible")
|
||||
|
||||
if format_info.get("is_legacy"):
|
||||
recommendations.append("Consider converting to modern format (.docx, .xlsx, .pptx)")
|
||||
|
||||
structure = format_info.get("structure", {})
|
||||
if structure.get("estimated_complexity") == "complex":
|
||||
recommendations.append("Complex document may require specialized processing")
|
||||
|
||||
if not recommendations:
|
||||
recommendations.append("Document appears healthy and ready for processing")
|
||||
|
||||
return recommendations
|
||||
|
||||
|
||||
def _smart_truncate_content(content: str, max_chars: int) -> str:
|
||||
"""Intelligently truncate content while preserving structure and readability."""
|
||||
if len(content) <= max_chars:
|
||||
return content
|
||||
|
||||
lines = content.split('\n')
|
||||
truncated_lines = []
|
||||
current_length = 0
|
||||
|
||||
# Try to preserve structure by stopping at a natural break point
|
||||
for line in lines:
|
||||
line_length = len(line) + 1 # +1 for newline
|
||||
|
||||
# If adding this line would exceed limit
|
||||
if current_length + line_length > max_chars:
|
||||
# Try to find a good stopping point
|
||||
if truncated_lines:
|
||||
# Check if we're in the middle of a section
|
||||
last_lines = '\n'.join(truncated_lines[-3:]) if len(truncated_lines) >= 3 else '\n'.join(truncated_lines)
|
||||
|
||||
# If we stopped mid-paragraph, remove incomplete paragraph
|
||||
if not (line.strip() == '' or line.startswith('#') or line.startswith('|')):
|
||||
# Remove lines until we hit a natural break
|
||||
while truncated_lines and not (
|
||||
truncated_lines[-1].strip() == '' or
|
||||
truncated_lines[-1].startswith('#') or
|
||||
truncated_lines[-1].startswith('|') or
|
||||
truncated_lines[-1].startswith('-') or
|
||||
truncated_lines[-1].startswith('*')
|
||||
):
|
||||
truncated_lines.pop()
|
||||
break
|
||||
|
||||
truncated_lines.append(line)
|
||||
current_length += line_length
|
||||
|
||||
# Add truncation notice
|
||||
result = '\n'.join(truncated_lines)
|
||||
result += f"\n\n---\n**[CONTENT TRUNCATED]**\nShowing {len(result):,} of {len(content):,} characters.\nUse smaller page ranges (e.g., 3-5 pages) for full content without truncation.\n---"
|
||||
|
||||
return result
|
||||
|
||||
|
||||
def _parse_page_range(page_range: str) -> list[int]:
|
||||
"""Parse page range string into list of page numbers.
|
||||
|
||||
Examples:
|
||||
"1-5" -> [1, 2, 3, 4, 5]
|
||||
"1,3,5" -> [1, 3, 5]
|
||||
"1-3,5,7-9" -> [1, 2, 3, 5, 7, 8, 9]
|
||||
"""
|
||||
pages = set()
|
||||
|
||||
for part in page_range.split(','):
|
||||
part = part.strip()
|
||||
if '-' in part:
|
||||
# Handle range like "1-5"
|
||||
start, end = part.split('-', 1)
|
||||
try:
|
||||
start_num = int(start.strip())
|
||||
end_num = int(end.strip())
|
||||
pages.update(range(start_num, end_num + 1))
|
||||
except ValueError:
|
||||
continue
|
||||
else:
|
||||
# Handle single page like "3"
|
||||
try:
|
||||
pages.add(int(part))
|
||||
except ValueError:
|
||||
continue
|
||||
|
||||
return sorted(list(pages))
|
||||
|
||||
|
||||
def _get_processing_recommendation(
|
||||
doc_analysis: dict[str, Any],
|
||||
page_range: str,
|
||||
summary_only: bool
|
||||
) -> dict[str, Any]:
|
||||
"""Generate intelligent processing recommendations based on document analysis."""
|
||||
|
||||
estimated_pages = doc_analysis["estimated_pages"]
|
||||
content_size = doc_analysis["estimated_content_size"]
|
||||
|
||||
recommendation = {
|
||||
"status": "optimal",
|
||||
"message": "",
|
||||
"suggested_workflow": [],
|
||||
"warnings": []
|
||||
}
|
||||
|
||||
# Large document recommendations
|
||||
if content_size in ["large", "very_large"] and not page_range and not summary_only:
|
||||
recommendation["status"] = "suboptimal"
|
||||
recommendation["message"] = (
|
||||
f"⚠️ Large document detected ({estimated_pages} estimated pages). "
|
||||
"Consider using recommended workflow for better performance."
|
||||
)
|
||||
recommendation["suggested_workflow"] = [
|
||||
"1. First: Call with summary_only=true to get document overview and TOC",
|
||||
"2. Then: Use page_range to process specific sections (e.g., '1-5', '6-10', '15-20')",
|
||||
"3. Recommended: Use 3-8 page chunks to stay under 25k token MCP limit",
|
||||
"4. The tool auto-truncates if content is too large, but smaller ranges work better"
|
||||
]
|
||||
recommendation["warnings"] = [
|
||||
"Page ranges >8 pages may hit 25k token response limit and get truncated",
|
||||
"Use smaller page ranges (3-5 pages) for dense content documents",
|
||||
"Auto-truncation preserves structure but loses content completeness"
|
||||
]
|
||||
|
||||
# Medium document recommendations
|
||||
elif content_size == "medium" and not page_range and not summary_only:
|
||||
recommendation["status"] = "caution"
|
||||
recommendation["message"] = (
|
||||
f"Medium document detected ({estimated_pages} estimated pages). "
|
||||
"Consider summary_only=true first if you encounter response size issues."
|
||||
)
|
||||
recommendation["suggested_workflow"] = [
|
||||
"Option 1: Try full processing (current approach)",
|
||||
"Option 2: Use summary_only=true first, then page_range if needed"
|
||||
]
|
||||
|
||||
# Optimal usage patterns
|
||||
elif summary_only:
|
||||
recommendation["message"] = "✅ Excellent! Using summary mode for initial document analysis."
|
||||
recommendation["suggested_workflow"] = [
|
||||
"After reviewing summary, use page_range to extract specific sections of interest"
|
||||
]
|
||||
|
||||
elif page_range and content_size in ["large", "very_large"]:
|
||||
recommendation["message"] = "✅ Perfect! Using page-range processing for efficient extraction."
|
||||
|
||||
elif content_size == "small":
|
||||
recommendation["message"] = "✅ Small document - full processing is optimal."
|
||||
|
||||
return recommendation
|
||||
File diff suppressed because it is too large
Load Diff
Loading…
x
Reference in New Issue
Block a user