Ryan Malloy 4b38f6455c Add document navigation tools and MCP prompts
New tools for Word document analysis:
- extract_entities: Pattern-based extraction of people, places, organizations
- get_chapter_summaries: Chapter previews with opening sentences and word counts
- save_reading_progress: Bookmark reading position to JSON file
- get_reading_progress: Resume reading from saved position

New MCP prompts (basic to advanced workflows):
- explore-document: Get started with a new document
- find-character: Track character mentions
- chapter-preview: Quick chapter overviews
- resume-reading: Continue where you left off
- document-analysis: Comprehensive multi-tool analysis
- character-journey: Track character arc through narrative
- document-comparison: Compare entities between chapters
- full-reading-session: Guided reading with bookmarking
- manuscript-review: Complete editorial workflow

Updated test counts for 19 total tools (6 universal + 10 word + 3 excel)
2026-01-11 07:23:15 -07:00

1436 lines
62 KiB
Python

"""Word Document Tools Mixin - Specialized tools for Word document processing."""
import os
import time
from typing import Any, Optional
from fastmcp.contrib.mcp_mixin import MCPMixin, mcp_tool
from pydantic import Field
from ..utils import (
OfficeFileError,
resolve_office_file_path,
validate_office_file,
detect_format,
resolve_field_defaults,
handle_office_errors
)
from ..pagination import paginate_document_conversion, PaginationParams
class WordMixin(MCPMixin):
"""Mixin containing Word-specific tools for advanced document processing."""
@mcp_tool(
name="convert_to_markdown",
description="Convert Office documents to Markdown format with intelligent processing and automatic pagination for large documents. ⚠️ LARGE DOCUMENT HANDLING: Documents exceeding 25k tokens are automatically paginated into manageable sections. Use cursor_id to continue through pages. For massive documents (200+ pages), pagination prevents token limit errors while preserving document structure and context."
)
@handle_office_errors("Markdown conversion")
@resolve_field_defaults(
include_images=True,
image_mode="base64",
max_image_size=1024*1024,
preserve_structure=True,
page_range="",
bookmark_name="",
chapter_name="",
summary_only=False,
output_dir="",
limit=50,
cursor_id=None,
session_id=None,
return_all=False
)
async def convert_to_markdown(
self,
file_path: str = Field(description="Path to Office document or URL"),
include_images: bool = Field(default=True, description="Include images in markdown output. When True, images are extracted to files and linked in the markdown."),
image_mode: str = Field(default="files", description="Image handling mode: 'files' (default, saves to disk and links), 'base64' (embeds inline - WARNING: can create massive responses), or 'references' (metadata only, no content)"),
max_image_size: int = Field(default=1024*1024, description="Maximum image size in bytes for base64 encoding (only used when image_mode='base64')"),
preserve_structure: bool = Field(default=True, description="Preserve document structure (headings, lists, tables)"),
page_range: str = Field(default="", description="Page range to convert (e.g., '1-5', '3', '1,3,5-10'). RECOMMENDED for large documents. Empty = all pages"),
bookmark_name: str = Field(default="", description="Extract content for a specific bookmark/chapter (e.g., 'Chapter1_Start'). More reliable than page ranges."),
chapter_name: str = Field(default="", description="Extract content for a chapter by heading text (e.g., 'Chapter 1', 'Introduction'). Works when bookmarks aren't available."),
summary_only: bool = Field(default=False, description="Return only metadata and truncated summary. STRONGLY RECOMMENDED for large docs (>10 pages)"),
output_dir: str = Field(default="", description="Output directory for extracted image files. If empty, uses a temp directory based on document name."),
# Pagination parameters
limit: int = Field(default=50, description="Maximum number of document sections to return per page"),
cursor_id: Optional[str] = Field(default=None, description="Cursor ID for pagination continuation"),
session_id: Optional[str] = Field(default=None, description="Session ID for pagination isolation"),
return_all: bool = Field(default=False, description="Return entire document bypassing pagination (WARNING: may exceed token limits)")
) -> dict[str, Any]:
start_time = time.time()
# Resolve file path
local_path = await resolve_office_file_path(file_path)
# Validate file
validation = await validate_office_file(local_path)
if not validation["is_valid"]:
raise OfficeFileError(f"Invalid file: {', '.join(validation['errors'])}")
# Get format info
format_info = await detect_format(local_path)
category = format_info["category"]
extension = format_info["extension"]
# Currently focused on Word documents for markdown conversion
if category != "word":
raise OfficeFileError(f"Markdown conversion currently only supports Word documents, got: {category}")
# Analyze document size and provide intelligent recommendations
doc_analysis = await self._analyze_document_size(local_path, extension)
processing_recommendation = self._get_processing_recommendation(
doc_analysis, page_range, summary_only
)
# Parse page range if provided
page_numbers = self._parse_page_range(page_range) if page_range else None
# Prioritize bookmark/chapter extraction over page ranges
if bookmark_name or chapter_name:
page_numbers = None # Ignore page ranges when bookmark or chapter is specified
# Convert to markdown based on format
if extension == ".docx":
markdown_result = await self._convert_docx_to_markdown(
local_path, include_images, image_mode, max_image_size,
preserve_structure, page_numbers, summary_only, output_dir, bookmark_name, chapter_name
)
else: # .doc
# For legacy .doc files, use mammoth if available
markdown_result = await self._convert_doc_to_markdown(
local_path, include_images, image_mode, max_image_size,
preserve_structure, page_numbers, summary_only, output_dir
)
# Check if pagination is needed
markdown_content = markdown_result["content"]
estimated_tokens = len(markdown_content) // 4 # Rough token estimation
# Generate session ID if not provided
if not session_id:
session_id = f"word-{int(time.time())}-{os.getpid()}"
# Create pagination parameters
pagination_params = PaginationParams(
limit=limit,
cursor_id=cursor_id,
session_id=session_id,
return_all=return_all
)
# Apply pagination if content is large or pagination is explicitly requested
# Skip pagination only if return_all=True AND no cursor_id AND content is manageable
should_paginate = (cursor_id or estimated_tokens > 25000 or (not return_all and estimated_tokens > 8000))
if should_paginate:
paginated_result = paginate_document_conversion(
tool_name="convert_to_markdown",
document_path=local_path,
markdown_content=markdown_content,
params=pagination_params,
session_id=session_id,
total_estimated_tokens=estimated_tokens
)
# If pagination was applied, return the paginated result
if "pagination" in paginated_result:
# Add metadata to the paginated result
paginated_result["metadata"] = {
"original_file": os.path.basename(local_path),
"format": format_info["format_name"],
"conversion_method": markdown_result["method_used"],
"conversion_time": round(time.time() - start_time, 3),
"summary_only": summary_only,
"document_analysis": doc_analysis,
"processing_recommendation": processing_recommendation,
"session_id": session_id
}
# Add additional metadata from original result
if "images" in markdown_result:
paginated_result["metadata"]["images_found"] = len(markdown_result["images"])
if "structure" in markdown_result:
paginated_result["metadata"]["structure_preserved"] = bool(markdown_result["structure"])
return paginated_result
# Build result based on mode (non-paginated or bypass pagination)
result = {
"metadata": {
"original_file": os.path.basename(local_path),
"format": format_info["format_name"],
"conversion_method": markdown_result["method_used"],
"conversion_time": round(time.time() - start_time, 3),
"summary_only": summary_only,
"document_analysis": doc_analysis,
"processing_recommendation": processing_recommendation,
"session_id": session_id,
"estimated_tokens": estimated_tokens
}
}
# Add page range info if used
if page_range:
result["metadata"]["page_range"] = page_range
result["metadata"]["pages_processed"] = len(page_numbers) if page_numbers else 0
# Add content based on mode
if summary_only:
# VERY restrictive summary mode to prevent massive responses
result["metadata"]["character_count"] = len(markdown_result["content"])
result["metadata"]["word_count"] = len(markdown_result["content"].split())
# Ultra-short summary (only 500 chars max)
result["summary"] = markdown_result["content"][:500] + "..." if len(markdown_result["content"]) > 500 else markdown_result["content"]
# Severely limit table of contents to prevent 1M+ token responses
if "table_of_contents" in markdown_result:
toc = markdown_result["table_of_contents"]
if isinstance(toc, dict):
# Keep only essential TOC info, severely truncated
result["table_of_contents"] = {
"note": toc.get("note", ""),
"basic_info": toc.get("basic_info", "")[:200], # Limit to 200 chars
}
# Add bookmark/heading info if available (limit to first 5 items)
if "bookmarks" in toc:
result["table_of_contents"]["bookmarks"] = toc["bookmarks"][:5]
result["table_of_contents"]["bookmark_count"] = toc.get("bookmark_count", 0)
if "available_headings" in toc:
result["table_of_contents"]["available_headings"] = toc["available_headings"][:5]
result["table_of_contents"]["heading_count"] = toc.get("heading_count", 0)
else:
result["table_of_contents"] = {"note": "Summary mode - use full processing for detailed TOC"}
else:
# Full content mode
result["markdown"] = markdown_result["content"]
result["content_truncated"] = len(markdown_result["content"]) >= 200000 # Warn if near limit
# Add images info
if "images" in markdown_result:
result["images"] = markdown_result["images"]
# Add structure info
if "structure" in markdown_result:
result["structure"] = markdown_result["structure"]
# Add table of contents if available
if "table_of_contents" in markdown_result:
result["table_of_contents"] = markdown_result["table_of_contents"]
return result
# Helper methods - import from monolithic server
async def _analyze_document_size(self, file_path: str, extension: str) -> dict[str, Any]:
"""Analyze document size for processing recommendations."""
from ..utils import _analyze_document_size
return await _analyze_document_size(file_path, extension)
def _get_processing_recommendation(self, doc_analysis: dict[str, Any], page_range: str, summary_only: bool) -> dict[str, Any]:
"""Get processing recommendations based on document analysis."""
from ..utils import _get_processing_recommendation
return _get_processing_recommendation(doc_analysis, page_range, summary_only)
def _parse_page_range(self, page_range: str) -> list[int]:
"""Parse page range string into list of page numbers."""
from ..utils import _parse_page_range
return _parse_page_range(page_range)
async def _convert_docx_to_markdown(
self, file_path: str, include_images: bool, image_mode: str, max_image_size: int,
preserve_structure: bool, page_numbers: list[int], summary_only: bool, output_dir: str,
bookmark_name: str = "", chapter_name: str = ""
) -> dict[str, Any]:
"""Convert .docx to markdown."""
from ..utils import _convert_docx_to_markdown
return await _convert_docx_to_markdown(
file_path, include_images, image_mode, max_image_size,
preserve_structure, page_numbers, summary_only, output_dir, bookmark_name, chapter_name
)
async def _convert_doc_to_markdown(
self, file_path: str, include_images: bool, image_mode: str, max_image_size: int,
preserve_structure: bool, page_numbers: list[int], summary_only: bool, output_dir: str
) -> dict[str, Any]:
"""Convert legacy .doc to markdown."""
from ..utils import _convert_doc_to_markdown
return await _convert_doc_to_markdown(
file_path, include_images, image_mode, max_image_size,
preserve_structure, page_numbers, summary_only, output_dir
)
@mcp_tool(
name="extract_word_tables",
description="Extract all tables from Word documents with structure, styling, and data conversion options. Returns tables as structured data with CSV/JSON export capability."
)
@handle_office_errors("Table extraction")
@resolve_field_defaults(
include_styling=True,
output_format="structured",
preserve_merged_cells=True,
include_headers=True
)
async def extract_word_tables(
self,
file_path: str = Field(description="Path to Word document or URL"),
include_styling: bool = Field(default=True, description="Include table styling information (borders, alignment, etc.)"),
output_format: str = Field(default="structured", description="Output format: structured, csv, json, markdown"),
preserve_merged_cells: bool = Field(default=True, description="Handle merged cells appropriately"),
include_headers: bool = Field(default=True, description="Identify and mark header rows/columns")
) -> dict[str, Any]:
"""Extract tables from Word documents with comprehensive structure analysis."""
start_time = time.time()
import csv
import json
import io
# Resolve and validate file
resolved_path = await resolve_office_file_path(file_path)
validation = await validate_office_file(resolved_path)
if validation["category"] != "word":
raise OfficeFileError(f"Table extraction requires Word document, got: {validation['format_name']}")
# Import required libraries
import docx
# Load document
doc = docx.Document(resolved_path)
tables_data = []
table_index = 0
for table in doc.tables:
table_info = {
"table_index": table_index,
"dimensions": {
"rows": len(table.rows),
"columns": len(table.columns) if table.rows else 0
},
"data": [],
"metadata": {}
}
# Extract table styling if requested
if include_styling:
table_info["styling"] = {
"table_style": table.style.name if table.style else None,
"alignment": str(table.alignment) if hasattr(table, 'alignment') else None
}
# Extract table data
for row_idx, row in enumerate(table.rows):
row_data = []
row_styling = [] if include_styling else None
for col_idx, cell in enumerate(row.cells):
cell_text = cell.text.strip()
cell_info = {"text": cell_text}
if include_styling:
cell_style = {
"bold": False,
"italic": False,
"alignment": None
}
# Check text formatting in paragraphs
for paragraph in cell.paragraphs:
for run in paragraph.runs:
if run.bold:
cell_style["bold"] = True
if run.italic:
cell_style["italic"] = True
if paragraph.alignment is not None:
cell_style["alignment"] = str(paragraph.alignment)
cell_info["styling"] = cell_style
row_styling.append(cell_style)
# Handle merged cells
if preserve_merged_cells:
# Basic merged cell detection (simplified)
cell_info["is_merged"] = len(cell.text.strip()) == 0 and col_idx > 0
row_data.append(cell_info)
table_info["data"].append({
"row_index": row_idx,
"cells": row_data,
"styling": row_styling if include_styling else None
})
# Identify headers if requested
if include_headers and table_info["data"]:
# Simple header detection: first row with all non-empty cells
first_row_cells = table_info["data"][0]["cells"]
if all(cell["text"] for cell in first_row_cells):
table_info["metadata"]["has_header_row"] = True
table_info["metadata"]["headers"] = [cell["text"] for cell in first_row_cells]
else:
table_info["metadata"]["has_header_row"] = False
# Convert to requested output format
if output_format in ["csv", "json", "markdown"]:
converted_data = self._convert_table_format(table_info, output_format)
table_info["converted_output"] = converted_data
tables_data.append(table_info)
table_index += 1
# Generate summary
total_tables = len(tables_data)
total_cells = sum(table["dimensions"]["rows"] * table["dimensions"]["columns"] for table in tables_data)
return {
"tables": tables_data,
"summary": {
"total_tables": total_tables,
"total_cells": total_cells,
"extraction_time": time.time() - start_time,
"output_format": output_format,
"file_info": validation
}
}
def _convert_table_format(self, table_info: dict, format_type: str) -> str:
"""Convert table data to specified format."""
rows_data = []
# Extract plain text data
for row in table_info["data"]:
row_texts = [cell["text"] for cell in row["cells"]]
rows_data.append(row_texts)
if format_type == "csv":
output = io.StringIO()
writer = csv.writer(output)
writer.writerows(rows_data)
return output.getvalue()
elif format_type == "json":
if table_info["metadata"].get("has_header_row", False):
headers = rows_data[0]
data_rows = rows_data[1:]
json_data = [dict(zip(headers, row)) for row in data_rows]
else:
json_data = [{"col_" + str(i): cell for i, cell in enumerate(row)} for row in rows_data]
return json.dumps(json_data, indent=2)
elif format_type == "markdown":
if not rows_data:
return ""
markdown = ""
for i, row in enumerate(rows_data):
# Escape pipe characters in cell content
escaped_row = [cell.replace("|", "\\|") for cell in row]
markdown += "| " + " | ".join(escaped_row) + " |\n"
# Add separator after header row
if i == 0 and table_info["metadata"].get("has_header_row", False):
markdown += "| " + " | ".join(["---"] * len(row)) + " |\n"
return markdown
return ""
@mcp_tool(
name="analyze_word_structure",
description="Analyze Word document structure including headings, sections, page layout, and document hierarchy. Provides navigation map and content organization insights."
)
@handle_office_errors("Structure analysis")
@resolve_field_defaults(
include_page_info=True,
extract_outline=True,
analyze_styles=True
)
async def analyze_word_structure(
self,
file_path: str = Field(description="Path to Word document or URL"),
include_page_info: bool = Field(default=True, description="Include page layout and section information"),
extract_outline: bool = Field(default=True, description="Extract document outline and heading hierarchy"),
analyze_styles: bool = Field(default=True, description="Analyze custom styles and formatting patterns")
) -> dict[str, Any]:
"""Analyze Word document structure and organization."""
start_time = time.time()
# Resolve and validate file
resolved_path = await resolve_office_file_path(file_path)
validation = await validate_office_file(resolved_path)
if validation["category"] != "word":
raise OfficeFileError(f"Structure analysis requires Word document, got: {validation['format_name']}")
# Import required libraries
import docx
from docx.enum.style import WD_STYLE_TYPE
# Load document
doc = docx.Document(resolved_path)
structure_info = {
"document_info": {
"total_paragraphs": len(doc.paragraphs),
"total_tables": len(doc.tables),
"total_sections": len(doc.sections)
}
}
# Extract outline and headings
if extract_outline:
headings = []
heading_styles = ['Heading 1', 'Heading 2', 'Heading 3', 'Heading 4', 'Heading 5', 'Heading 6']
for para_idx, paragraph in enumerate(doc.paragraphs):
if paragraph.style.name in heading_styles:
level = int(paragraph.style.name.split()[-1])
headings.append({
"text": paragraph.text.strip(),
"level": level,
"style": paragraph.style.name,
"paragraph_index": para_idx
})
structure_info["outline"] = {
"headings": headings,
"heading_count": len(headings),
"max_depth": max([h["level"] for h in headings]) if headings else 0
}
# Create navigation tree
structure_info["navigation_tree"] = self._build_navigation_tree(headings)
# Analyze page layout and sections
if include_page_info:
sections_info = []
for section_idx, section in enumerate(doc.sections):
section_info = {
"section_index": section_idx,
"page_dimensions": {},
"margins": {}
}
# Safely extract page dimensions
try:
if section.page_width:
section_info["page_dimensions"]["width"] = float(section.page_width.inches)
if section.page_height:
section_info["page_dimensions"]["height"] = float(section.page_height.inches)
except (ValueError, AttributeError, TypeError):
section_info["page_dimensions"] = {"width": None, "height": None}
# Safely extract margins
try:
if section.left_margin:
section_info["margins"]["left"] = float(section.left_margin.inches)
if section.right_margin:
section_info["margins"]["right"] = float(section.right_margin.inches)
if section.top_margin:
section_info["margins"]["top"] = float(section.top_margin.inches)
if section.bottom_margin:
section_info["margins"]["bottom"] = float(section.bottom_margin.inches)
except (ValueError, AttributeError, TypeError):
section_info["margins"] = {"left": None, "right": None, "top": None, "bottom": None}
# Safely extract orientation
try:
if hasattr(section, 'orientation') and section.orientation is not None:
# orientation is an enum, get its name
section_info["orientation"] = section.orientation.name if hasattr(section.orientation, 'name') else str(section.orientation)
else:
section_info["orientation"] = None
except (ValueError, AttributeError, TypeError):
section_info["orientation"] = None
# Header and footer information
try:
if section.header:
section_info["has_header"] = True
section_info["header_text"] = " ".join([p.text for p in section.header.paragraphs]).strip()
except (ValueError, AttributeError, TypeError):
section_info["has_header"] = False
try:
if section.footer:
section_info["has_footer"] = True
section_info["footer_text"] = " ".join([p.text for p in section.footer.paragraphs]).strip()
except (ValueError, AttributeError, TypeError):
section_info["has_footer"] = False
sections_info.append(section_info)
structure_info["page_layout"] = sections_info
# Analyze styles
if analyze_styles:
styles_info = {
"paragraph_styles": [],
"character_styles": [],
"table_styles": [],
"style_usage": {}
}
# Collect style information
for style in doc.styles:
style_info = {
"name": style.name,
"type": str(style.type),
"builtin": style.builtin
}
if style.type == WD_STYLE_TYPE.PARAGRAPH:
styles_info["paragraph_styles"].append(style_info)
elif style.type == WD_STYLE_TYPE.CHARACTER:
styles_info["character_styles"].append(style_info)
elif style.type == WD_STYLE_TYPE.TABLE:
styles_info["table_styles"].append(style_info)
# Analyze style usage
style_usage = {}
for paragraph in doc.paragraphs:
style_name = paragraph.style.name
style_usage[style_name] = style_usage.get(style_name, 0) + 1
styles_info["style_usage"] = style_usage
structure_info["styles"] = styles_info
return {
"structure": structure_info,
"analysis_time": time.time() - start_time,
"file_info": validation
}
def _build_navigation_tree(self, headings: list) -> list:
"""Build hierarchical navigation tree from headings."""
if not headings:
return []
tree = []
stack = [] # Stack to keep track of parent nodes
for heading in headings:
node = {
"text": heading["text"],
"level": heading["level"],
"paragraph_index": heading["paragraph_index"],
"children": []
}
# Find the correct parent level
while stack and stack[-1]["level"] >= heading["level"]:
stack.pop()
if stack:
# Add as child to the parent
stack[-1]["children"].append(node)
else:
# Add as root level
tree.append(node)
stack.append(node)
return tree
# ==================== New Document Navigation Tools ====================
@mcp_tool(
name="get_document_outline",
description="Get a clean, structured outline of a Word document showing all headings, sections, and chapters with their locations. Perfect for understanding document structure before reading."
)
@handle_office_errors("Document outline")
async def get_document_outline(
self,
file_path: str = Field(description="Path to Word document or URL"),
include_word_counts: bool = Field(default=True, description="Include estimated word count per section"),
detect_chapters: bool = Field(default=True, description="Detect and flag chapter headings specifically")
) -> dict[str, Any]:
"""Extract structured document outline with chapter detection."""
from docx import Document
from docx.oxml.ns import qn
start_time = time.time()
local_path = await resolve_office_file_path(file_path)
validation = await validate_office_file(local_path)
if not validation["is_valid"]:
raise OfficeFileError(f"Invalid file: {', '.join(validation['errors'])}")
doc = Document(local_path)
outline = []
current_section = None
section_word_count = 0
total_words = 0
chapter_pattern = ["chapter", "section", "part", "introduction", "conclusion", "appendix", "preface", "epilogue"]
for para_idx, para in enumerate(doc.paragraphs):
text = para.text.strip()
word_count = len(text.split()) if text else 0
total_words += word_count
# Check if this is a heading
style_name = para.style.name.lower() if para.style else ""
is_heading = "heading" in style_name or "title" in style_name
# Determine heading level
level = 0
if is_heading:
if "title" in style_name:
level = 0
elif "heading 1" in style_name or style_name == "heading1":
level = 1
elif "heading 2" in style_name or style_name == "heading2":
level = 2
elif "heading 3" in style_name or style_name == "heading3":
level = 3
elif "heading" in style_name:
# Try to extract number from style name
import re
match = re.search(r'heading\s*(\d+)', style_name)
level = int(match.group(1)) if match else 4
if is_heading and text:
# Save previous section's word count
if current_section is not None and include_word_counts:
current_section["word_count"] = section_word_count
# Detect if this is a chapter
is_chapter = False
chapter_number = None
if detect_chapters:
text_lower = text.lower()
for pattern in chapter_pattern:
if pattern in text_lower:
is_chapter = True
# Try to extract chapter number
import re
match = re.search(r'(?:chapter|section|part)\s*(\d+)', text_lower)
if match:
chapter_number = int(match.group(1))
break
current_section = {
"text": text[:150] + ("..." if len(text) > 150 else ""),
"level": level,
"style": para.style.name if para.style else "Unknown",
"paragraph_index": para_idx,
"is_chapter": is_chapter
}
if chapter_number is not None:
current_section["chapter_number"] = chapter_number
outline.append(current_section)
section_word_count = 0
else:
section_word_count += word_count
# Don't forget last section
if current_section is not None and include_word_counts:
current_section["word_count"] = section_word_count
# Build summary statistics
chapters = [item for item in outline if item.get("is_chapter")]
chapter_numbers = [c.get("chapter_number") for c in chapters if c.get("chapter_number")]
# Detect missing chapters
missing_chapters = []
if chapter_numbers:
expected = set(range(1, max(chapter_numbers) + 1))
found = set(chapter_numbers)
missing_chapters = sorted(expected - found)
return {
"outline": outline,
"summary": {
"total_headings": len(outline),
"chapters_found": len(chapters),
"chapter_numbers": chapter_numbers,
"missing_chapters": missing_chapters,
"total_words": total_words,
"total_paragraphs": len(doc.paragraphs)
},
"extraction_time": round(time.time() - start_time, 3)
}
@mcp_tool(
name="check_style_consistency",
description="Analyze a Word document for style inconsistencies, formatting issues, and potential problems like mismatched heading styles or missing chapters."
)
@handle_office_errors("Style consistency check")
async def check_style_consistency(
self,
file_path: str = Field(description="Path to Word document or URL")
) -> dict[str, Any]:
"""Check document for style and formatting consistency issues."""
from docx import Document
start_time = time.time()
local_path = await resolve_office_file_path(file_path)
validation = await validate_office_file(local_path)
if not validation["is_valid"]:
raise OfficeFileError(f"Invalid file: {', '.join(validation['errors'])}")
doc = Document(local_path)
issues = []
warnings = []
# Track heading styles and chapter detection
heading_styles = {}
chapters_by_style = {"heading": [], "other": []}
chapter_numbers_found = []
import re
chapter_pattern = re.compile(r'^chapter\s*(\d+)', re.IGNORECASE)
for para_idx, para in enumerate(doc.paragraphs):
text = para.text.strip()
style_name = para.style.name if para.style else "None"
style_lower = style_name.lower()
# Track style usage
heading_styles[style_name] = heading_styles.get(style_name, 0) + 1
# Check for chapter-like text
chapter_match = chapter_pattern.match(text)
if chapter_match:
chapter_num = int(chapter_match.group(1))
chapter_numbers_found.append(chapter_num)
is_heading_style = "heading" in style_lower
if is_heading_style:
chapters_by_style["heading"].append({
"chapter": chapter_num,
"text": text[:80],
"style": style_name,
"paragraph": para_idx
})
else:
chapters_by_style["other"].append({
"chapter": chapter_num,
"text": text[:80],
"style": style_name,
"paragraph": para_idx
})
issues.append({
"type": "inconsistent_chapter_style",
"severity": "warning",
"message": f"Chapter {chapter_num} uses '{style_name}' instead of a Heading style",
"paragraph": para_idx,
"text": text[:80]
})
# Check for potential headings that aren't styled as headings
if text and len(text) < 100 and not text.endswith('.'):
is_heading_style = "heading" in style_lower or "title" in style_lower
looks_like_heading = any(word in text.lower() for word in
["chapter", "section", "part", "introduction", "conclusion", "appendix"])
if looks_like_heading and not is_heading_style:
warnings.append({
"type": "potential_heading_not_styled",
"message": f"Text looks like a heading but uses '{style_name}' style",
"paragraph": para_idx,
"text": text[:80]
})
# Check for missing chapters in sequence
missing_chapters = []
if chapter_numbers_found:
chapter_numbers_found.sort()
expected = set(range(1, max(chapter_numbers_found) + 1))
found = set(chapter_numbers_found)
missing_chapters = sorted(expected - found)
for missing in missing_chapters:
issues.append({
"type": "missing_chapter",
"severity": "error",
"message": f"Chapter {missing} appears to be missing from sequence",
"expected_between": f"Chapter {missing-1} and Chapter {missing+1}" if missing > 1 else f"Before Chapter {missing+1}"
})
# Check for duplicate chapter numbers
from collections import Counter
chapter_counts = Counter(chapter_numbers_found)
duplicates = {num: count for num, count in chapter_counts.items() if count > 1}
for chapter_num, count in duplicates.items():
issues.append({
"type": "duplicate_chapter",
"severity": "warning",
"message": f"Chapter {chapter_num} appears {count} times"
})
# Summary of heading style usage
heading_summary = {k: v for k, v in heading_styles.items()
if "heading" in k.lower() or "title" in k.lower()}
return {
"issues": issues,
"warnings": warnings,
"chapter_analysis": {
"total_chapters": len(chapter_numbers_found),
"chapters_with_heading_style": len(chapters_by_style["heading"]),
"chapters_without_heading_style": len(chapters_by_style["other"]),
"missing_chapters": missing_chapters,
"duplicate_chapters": list(duplicates.keys()),
"chapter_details": chapters_by_style
},
"style_usage": heading_summary,
"health_score": self._calculate_doc_health_score(issues, warnings),
"analysis_time": round(time.time() - start_time, 3)
}
def _calculate_doc_health_score(self, issues: list, warnings: list) -> dict:
"""Calculate document health score based on issues found."""
score = 100
for issue in issues:
if issue.get("severity") == "error":
score -= 10
elif issue.get("severity") == "warning":
score -= 5
for _ in warnings:
score -= 2
score = max(0, min(100, score))
if score >= 90:
rating = "excellent"
elif score >= 70:
rating = "good"
elif score >= 50:
rating = "fair"
else:
rating = "needs attention"
return {"score": score, "rating": rating}
@mcp_tool(
name="search_document",
description="Search for text within a Word document and return matches with surrounding context and location information."
)
@handle_office_errors("Document search")
async def search_document(
self,
file_path: str = Field(description="Path to Word document or URL"),
query: str = Field(description="Text to search for (case-insensitive)"),
context_chars: int = Field(default=100, description="Number of characters of context before and after match"),
max_results: int = Field(default=20, description="Maximum number of results to return")
) -> dict[str, Any]:
"""Search document for text with context."""
from docx import Document
start_time = time.time()
local_path = await resolve_office_file_path(file_path)
validation = await validate_office_file(local_path)
if not validation["is_valid"]:
raise OfficeFileError(f"Invalid file: {', '.join(validation['errors'])}")
doc = Document(local_path)
query_lower = query.lower()
results = []
current_chapter = None
current_section = None
for para_idx, para in enumerate(doc.paragraphs):
text = para.text
style_name = para.style.name if para.style else ""
style_lower = style_name.lower()
# Track current chapter/section for context
if "heading" in style_lower or "title" in style_lower:
if "1" in style_name or "title" in style_lower:
current_chapter = text.strip()[:80]
current_section = None
else:
current_section = text.strip()[:80]
# Search for matches
text_lower = text.lower()
search_start = 0
while True:
pos = text_lower.find(query_lower, search_start)
if pos == -1:
break
if len(results) >= max_results:
break
# Extract context
context_start = max(0, pos - context_chars)
context_end = min(len(text), pos + len(query) + context_chars)
context = text[context_start:context_end]
if context_start > 0:
context = "..." + context
if context_end < len(text):
context = context + "..."
results.append({
"paragraph_index": para_idx,
"position": pos,
"context": context,
"chapter": current_chapter,
"section": current_section,
"style": style_name
})
search_start = pos + 1
if len(results) >= max_results:
break
return {
"query": query,
"total_matches": len(results),
"results": results,
"search_time": round(time.time() - start_time, 3),
"truncated": len(results) >= max_results
}
@mcp_tool(
name="extract_entities",
description="Extract named entities (people, places, organizations) from a Word document using pattern-based recognition. Great for identifying key characters, locations, and institutions mentioned in the text."
)
@handle_office_errors("Entity extraction")
async def extract_entities(
self,
file_path: str = Field(description="Path to Word document or URL"),
entity_types: str = Field(default="all", description="Entity types to extract: 'all', 'people', 'places', 'organizations', or comma-separated combination"),
min_occurrences: int = Field(default=1, description="Minimum occurrences for an entity to be included"),
include_context: bool = Field(default=True, description="Include sample context for each entity")
) -> dict[str, Any]:
"""Extract named entities from document using pattern-based recognition."""
from docx import Document
from collections import defaultdict
import re
start_time = time.time()
local_path = await resolve_office_file_path(file_path)
validation = await validate_office_file(local_path)
if not validation["is_valid"]:
raise OfficeFileError(f"Invalid file: {', '.join(validation['errors'])}")
doc = Document(local_path)
# Parse entity types to extract
if entity_types == "all":
extract_types = {"people", "places", "organizations"}
else:
extract_types = set(t.strip().lower() for t in entity_types.split(","))
# Entity containers with context tracking
entities = {
"people": defaultdict(lambda: {"count": 0, "contexts": []}),
"places": defaultdict(lambda: {"count": 0, "contexts": []}),
"organizations": defaultdict(lambda: {"count": 0, "contexts": []})
}
# Patterns for entity detection
# Titles indicating people
title_pattern = re.compile(
r'\b(Dr\.?|Mr\.?|Mrs\.?|Ms\.?|Miss|Professor|Prof\.?|Sister|Father|Rev\.?|'
r'President|Director|Nurse|RN|LPN|MD)\s+([A-Z][a-z]+(?:\s+[A-Z][a-z]+)?)',
re.IGNORECASE
)
# Organization patterns
org_suffixes = re.compile(
r'\b([A-Z][a-zA-Z\s\'\-]+(?:Hospital|Medical Center|Center|Clinic|University|'
r'College|School|Association|Institute|Foundation|Department|Administration|'
r'Committee|Board|Agency|Service|Company|Inc|Corp|LLC|VA|ANA))\b'
)
# Place patterns (cities, states, geographic locations)
place_patterns = re.compile(
r'\b([A-Z][a-z]+(?:\s+[A-Z][a-z]+)*),\s*((?:[A-Z]{2}|[A-Z][a-z]+(?:\s+[A-Z][a-z]+)*))\b|'
r'\b((?:North|South|East|West)\s+[A-Z][a-z]+)\b|'
r'\b([A-Z][a-z]+(?:\s+[A-Z][a-z]+)*)\s+(?:City|County|State|Valley|Mountain|River|Lake|Island)\b'
)
# Known US states for validation
us_states = {
'Alabama', 'Alaska', 'Arizona', 'Arkansas', 'California', 'Colorado',
'Connecticut', 'Delaware', 'Florida', 'Georgia', 'Hawaii', 'Idaho',
'Illinois', 'Indiana', 'Iowa', 'Kansas', 'Kentucky', 'Louisiana',
'Maine', 'Maryland', 'Massachusetts', 'Michigan', 'Minnesota',
'Mississippi', 'Missouri', 'Montana', 'Nebraska', 'Nevada',
'New Hampshire', 'New Jersey', 'New Mexico', 'New York',
'North Carolina', 'North Dakota', 'Ohio', 'Oklahoma', 'Oregon',
'Pennsylvania', 'Rhode Island', 'South Carolina', 'South Dakota',
'Tennessee', 'Texas', 'Utah', 'Vermont', 'Virginia', 'Washington',
'West Virginia', 'Wisconsin', 'Wyoming', 'DC', 'ID', 'WA', 'NY',
'CA', 'ND', 'MN', 'IA', 'MT', 'OR', 'NV', 'AZ', 'NM', 'CO', 'WY'
}
# Common first names for better people detection
common_titles = {'dr', 'mr', 'mrs', 'ms', 'miss', 'professor', 'prof',
'sister', 'father', 'rev', 'president', 'director', 'nurse'}
current_chapter = "Document Start"
for para_idx, para in enumerate(doc.paragraphs):
text = para.text
style_name = para.style.name if para.style else ""
# Track chapters for context
if "heading" in style_name.lower() and "1" in style_name:
current_chapter = text.strip()[:60]
# Skip very short paragraphs
if len(text) < 10:
continue
# Extract people
if "people" in extract_types:
for match in title_pattern.finditer(text):
title = match.group(1)
name = match.group(2).strip()
full_name = f"{title} {name}".strip()
# Clean up the name
if len(name) >= 2:
entities["people"][full_name]["count"] += 1
if include_context and len(entities["people"][full_name]["contexts"]) < 3:
# Get surrounding context
start = max(0, match.start() - 30)
end = min(len(text), match.end() + 50)
context = text[start:end].strip()
entities["people"][full_name]["contexts"].append({
"text": f"...{context}...",
"chapter": current_chapter,
"paragraph": para_idx
})
# Also look for standalone capitalized names after verbs
name_after_verb = re.finditer(
r'\b(?:said|told|asked|replied|answered|explained|noted|added|mentioned)\s+'
r'([A-Z][a-z]+(?:\s+[A-Z][a-z]+)?)\b',
text
)
for match in name_after_verb:
name = match.group(1).strip()
if len(name) >= 3 and name not in us_states:
entities["people"][name]["count"] += 1
if include_context and len(entities["people"][name]["contexts"]) < 3:
start = max(0, match.start() - 20)
end = min(len(text), match.end() + 40)
context = text[start:end].strip()
entities["people"][name]["contexts"].append({
"text": f"...{context}...",
"chapter": current_chapter,
"paragraph": para_idx
})
# Extract organizations
if "organizations" in extract_types:
for match in org_suffixes.finditer(text):
org_name = match.group(1).strip()
if len(org_name) >= 5:
entities["organizations"][org_name]["count"] += 1
if include_context and len(entities["organizations"][org_name]["contexts"]) < 3:
start = max(0, match.start() - 20)
end = min(len(text), match.end() + 40)
context = text[start:end].strip()
entities["organizations"][org_name]["contexts"].append({
"text": f"...{context}...",
"chapter": current_chapter,
"paragraph": para_idx
})
# Extract places
if "places" in extract_types:
for match in place_patterns.finditer(text):
# Try different capture groups
place = None
if match.group(1) and match.group(2): # City, State pattern
city = match.group(1).strip()
state = match.group(2).strip()
if state in us_states or len(state) == 2:
place = f"{city}, {state}"
elif match.group(3): # Directional places
place = match.group(3).strip()
elif match.group(4): # Geographic features
place = match.group(4).strip()
if place and len(place) >= 3:
entities["places"][place]["count"] += 1
if include_context and len(entities["places"][place]["contexts"]) < 3:
start = max(0, match.start() - 20)
end = min(len(text), match.end() + 40)
context = text[start:end].strip()
entities["places"][place]["contexts"].append({
"text": f"...{context}...",
"chapter": current_chapter,
"paragraph": para_idx
})
# Filter by minimum occurrences and prepare output
def filter_and_sort(entity_dict, min_count):
filtered = []
for name, data in entity_dict.items():
if data["count"] >= min_count:
entry = {
"name": name,
"occurrences": data["count"]
}
if include_context and data["contexts"]:
entry["sample_contexts"] = data["contexts"]
filtered.append(entry)
return sorted(filtered, key=lambda x: x["occurrences"], reverse=True)
result = {
"entities": {},
"summary": {
"total_entities": 0,
"by_type": {}
},
"extraction_time": round(time.time() - start_time, 3)
}
for entity_type in extract_types:
if entity_type in entities:
filtered = filter_and_sort(entities[entity_type], min_occurrences)
result["entities"][entity_type] = filtered
result["summary"]["by_type"][entity_type] = len(filtered)
result["summary"]["total_entities"] += len(filtered)
return result
@mcp_tool(
name="get_chapter_summaries",
description="Get brief summaries/previews of each chapter in a Word document. Extracts the opening sentences of each chapter to give a quick overview of content."
)
@handle_office_errors("Chapter summaries")
async def get_chapter_summaries(
self,
file_path: str = Field(description="Path to Word document or URL"),
sentences_per_chapter: int = Field(default=3, description="Number of opening sentences to include per chapter"),
include_word_counts: bool = Field(default=True, description="Include word count for each chapter")
) -> dict[str, Any]:
"""Extract chapter summaries/previews from document."""
from docx import Document
import re
start_time = time.time()
local_path = await resolve_office_file_path(file_path)
validation = await validate_office_file(local_path)
if not validation["is_valid"]:
raise OfficeFileError(f"Invalid file: {', '.join(validation['errors'])}")
doc = Document(local_path)
chapters = []
current_chapter = None
chapter_text = []
chapter_word_count = 0
chapter_pattern = re.compile(r'^chapter\s*(\d+)', re.IGNORECASE)
def extract_preview(text_paragraphs, num_sentences):
"""Extract first N sentences from collected paragraphs."""
full_text = " ".join(text_paragraphs)
# Simple sentence splitting
sentences = re.split(r'(?<=[.!?])\s+', full_text)
preview_sentences = sentences[:num_sentences]
return " ".join(preview_sentences).strip()
def save_current_chapter():
"""Save the current chapter's data."""
nonlocal current_chapter, chapter_text, chapter_word_count
if current_chapter:
preview = extract_preview(chapter_text, sentences_per_chapter)
chapter_data = {
"chapter_number": current_chapter["number"],
"title": current_chapter["title"],
"paragraph_index": current_chapter["paragraph_index"],
"preview": preview if preview else "(No text content found)",
}
if include_word_counts:
chapter_data["word_count"] = chapter_word_count
chapters.append(chapter_data)
for para_idx, para in enumerate(doc.paragraphs):
text = para.text.strip()
style_name = para.style.name if para.style else ""
# Check if this is a chapter heading
chapter_match = chapter_pattern.match(text)
if chapter_match:
# Save previous chapter first
save_current_chapter()
# Start new chapter
current_chapter = {
"number": int(chapter_match.group(1)),
"title": text[:100],
"paragraph_index": para_idx
}
chapter_text = []
chapter_word_count = 0
elif current_chapter:
# Accumulate text for current chapter
if text:
word_count = len(text.split())
chapter_word_count += word_count
# Only collect first portion of text for preview
if len(" ".join(chapter_text)) < 1000:
chapter_text.append(text)
# Don't forget the last chapter
save_current_chapter()
# Calculate statistics
total_words = sum(c.get("word_count", 0) for c in chapters)
avg_words = total_words // len(chapters) if chapters else 0
return {
"chapters": chapters,
"summary": {
"total_chapters": len(chapters),
"total_words": total_words,
"average_words_per_chapter": avg_words,
"shortest_chapter": min((c for c in chapters), key=lambda x: x.get("word_count", 0), default=None),
"longest_chapter": max((c for c in chapters), key=lambda x: x.get("word_count", 0), default=None)
},
"extraction_time": round(time.time() - start_time, 3)
}
@mcp_tool(
name="save_reading_progress",
description="Save your reading progress in a Word document. Creates a bookmark file to track which chapter/paragraph you're on, so you can resume reading later."
)
@handle_office_errors("Save reading progress")
async def save_reading_progress(
self,
file_path: str = Field(description="Path to Word document"),
chapter_number: int = Field(default=1, description="Current chapter number"),
paragraph_index: int = Field(default=0, description="Current paragraph index"),
notes: str = Field(default="", description="Optional notes about where you left off")
) -> dict[str, Any]:
"""Save reading progress to a bookmark file."""
import json
from datetime import datetime
local_path = await resolve_office_file_path(file_path)
validation = await validate_office_file(local_path)
if not validation["is_valid"]:
raise OfficeFileError(f"Invalid file: {', '.join(validation['errors'])}")
# Create bookmark file path (same location as document)
doc_dir = os.path.dirname(local_path)
doc_name = os.path.splitext(os.path.basename(local_path))[0]
bookmark_path = os.path.join(doc_dir, f".{doc_name}.reading_progress.json")
# Load existing bookmarks or create new
bookmarks = {"history": []}
if os.path.exists(bookmark_path):
try:
with open(bookmark_path, 'r') as f:
bookmarks = json.load(f)
except (json.JSONDecodeError, IOError):
bookmarks = {"history": []}
# Create new bookmark entry
bookmark = {
"timestamp": datetime.now().isoformat(),
"chapter": chapter_number,
"paragraph_index": paragraph_index,
"notes": notes
}
# Update current position and add to history
bookmarks["current"] = bookmark
bookmarks["document"] = os.path.basename(local_path)
bookmarks["history"].append(bookmark)
# Keep only last 50 history entries
if len(bookmarks["history"]) > 50:
bookmarks["history"] = bookmarks["history"][-50:]
# Save bookmark file
with open(bookmark_path, 'w') as f:
json.dump(bookmarks, f, indent=2)
return {
"saved": True,
"bookmark_file": bookmark_path,
"position": {
"chapter": chapter_number,
"paragraph_index": paragraph_index
},
"notes": notes,
"timestamp": bookmark["timestamp"],
"history_entries": len(bookmarks["history"])
}
@mcp_tool(
name="get_reading_progress",
description="Retrieve your saved reading progress for a Word document. Shows where you left off and your reading history."
)
@handle_office_errors("Get reading progress")
async def get_reading_progress(
self,
file_path: str = Field(description="Path to Word document")
) -> dict[str, Any]:
"""Retrieve saved reading progress from bookmark file."""
import json
local_path = await resolve_office_file_path(file_path)
validation = await validate_office_file(local_path)
if not validation["is_valid"]:
raise OfficeFileError(f"Invalid file: {', '.join(validation['errors'])}")
# Find bookmark file
doc_dir = os.path.dirname(local_path)
doc_name = os.path.splitext(os.path.basename(local_path))[0]
bookmark_path = os.path.join(doc_dir, f".{doc_name}.reading_progress.json")
if not os.path.exists(bookmark_path):
return {
"has_progress": False,
"message": "No reading progress saved for this document. Use save_reading_progress to save your position."
}
# Load bookmarks
try:
with open(bookmark_path, 'r') as f:
bookmarks = json.load(f)
except (json.JSONDecodeError, IOError) as e:
return {
"has_progress": False,
"error": f"Could not read bookmark file: {str(e)}"
}
current = bookmarks.get("current", {})
history = bookmarks.get("history", [])
return {
"has_progress": True,
"document": bookmarks.get("document", os.path.basename(local_path)),
"current_position": {
"chapter": current.get("chapter"),
"paragraph_index": current.get("paragraph_index"),
"notes": current.get("notes", ""),
"last_read": current.get("timestamp")
},
"reading_sessions": len(history),
"recent_history": history[-5:] if history else [],
"bookmark_file": bookmark_path
}