Compare commits
No commits in common. "11defb4eaee3003997bc98f4bb0feac5a0e7bae2" and "35869b60991a46c007c3922f6ae1707628dd9ac2" have entirely different histories.
11defb4eae
...
35869b6099
9
.gitignore
vendored
9
.gitignore
vendored
@ -78,12 +78,3 @@ tmp/
|
|||||||
# Temporary files created during processing
|
# Temporary files created during processing
|
||||||
*.tmp
|
*.tmp
|
||||||
*.temp
|
*.temp
|
||||||
|
|
||||||
# Test documents (personal/private)
|
|
||||||
ORIGINAL - The Other Side of the Bed*.docx
|
|
||||||
|
|
||||||
# Reading progress bookmarks (user-specific)
|
|
||||||
.*.reading_progress.json
|
|
||||||
|
|
||||||
# Local MCP config
|
|
||||||
.mcp.json
|
|
||||||
|
|||||||
25
README.md
25
README.md
@ -83,13 +83,6 @@ claude mcp add office-tools "uvx mcp-office-tools"
|
|||||||
| `convert_to_markdown` | Convert to Markdown with automatic pagination for large docs |
|
| `convert_to_markdown` | Convert to Markdown with automatic pagination for large docs |
|
||||||
| `extract_word_tables` | Extract tables as structured JSON, CSV, or Markdown |
|
| `extract_word_tables` | Extract tables as structured JSON, CSV, or Markdown |
|
||||||
| `analyze_word_structure` | Analyze headings, sections, styles, and document hierarchy |
|
| `analyze_word_structure` | Analyze headings, sections, styles, and document hierarchy |
|
||||||
| `get_document_outline` | Get structured outline with chapter detection and word counts |
|
|
||||||
| `check_style_consistency` | Find formatting issues, missing chapters, style problems |
|
|
||||||
| `search_document` | Search text with context and chapter location |
|
|
||||||
| `extract_entities` | Extract people, places, organizations using pattern recognition |
|
|
||||||
| `get_chapter_summaries` | Generate chapter previews with opening sentences |
|
|
||||||
| `save_reading_progress` | Bookmark your reading position for later |
|
|
||||||
| `get_reading_progress` | Resume reading from saved position |
|
|
||||||
|
|
||||||
### Excel Tools
|
### Excel Tools
|
||||||
|
|
||||||
@ -124,24 +117,6 @@ Here's what works and what's "good enough" — legacy formats from Office 97-200
|
|||||||
|
|
||||||
---
|
---
|
||||||
|
|
||||||
## 🎯 MCP Prompts
|
|
||||||
|
|
||||||
Pre-built workflows that chain multiple tools together. Use these as starting points:
|
|
||||||
|
|
||||||
| Prompt | Level | Description |
|
|
||||||
|--------|-------|-------------|
|
|
||||||
| `explore-document` | Basic | Start with any new document - get structure and identify issues |
|
|
||||||
| `find-character` | Basic | Track all mentions of a person/character with context |
|
|
||||||
| `chapter-preview` | Basic | Quick overview of each chapter without full read |
|
|
||||||
| `resume-reading` | Intermediate | Check saved position and continue reading |
|
|
||||||
| `document-analysis` | Intermediate | Comprehensive multi-tool analysis |
|
|
||||||
| `character-journey` | Advanced | Track character arc through entire narrative |
|
|
||||||
| `document-comparison` | Advanced | Compare entities and themes between chapters |
|
|
||||||
| `full-reading-session` | Advanced | Guided reading with bookmarking |
|
|
||||||
| `manuscript-review` | Advanced | Complete editorial workflow for editors |
|
|
||||||
|
|
||||||
---
|
|
||||||
|
|
||||||
## 💡 Usage Examples
|
## 💡 Usage Examples
|
||||||
|
|
||||||
### Extract Text from Any Document
|
### Extract Text from Any Document
|
||||||
|
|||||||
@ -1,181 +0,0 @@
|
|||||||
# DOCX Processing Fixes
|
|
||||||
|
|
||||||
This document captures critical bugs discovered and fixed while processing complex Word documents (specifically a 200+ page manuscript with 10 chapters).
|
|
||||||
|
|
||||||
## Summary
|
|
||||||
|
|
||||||
| # | Bug | Impact | Root Cause |
|
|
||||||
|---|-----|--------|------------|
|
|
||||||
| 1 | FastMCP banner corruption | MCP connection fails | ASCII art breaks JSON-RPC |
|
|
||||||
| 2 | Page range cap | Wrong content extracted | Used max page# instead of count |
|
|
||||||
| 3 | Heading scan limit | Chapters not found | Only scanned first 100 elements |
|
|
||||||
| 4 | Short-text fallback logic | Chapters not found | `elif` prevented fallback |
|
|
||||||
| 5 | **xpath API mismatch** | **Complete silent failure** | **python-docx != lxml API** |
|
|
||||||
| 6 | Image mode default | Response too large | Base64 bloats output |
|
|
||||||
|
|
||||||
---
|
|
||||||
|
|
||||||
## 1. FastMCP Banner Corruption
|
|
||||||
|
|
||||||
**File:** `src/mcp_office_tools/server.py`
|
|
||||||
|
|
||||||
**Symptom:** MCP connection fails with `Invalid JSON: EOF while parsing`
|
|
||||||
|
|
||||||
**Cause:** FastMCP's default startup banner prints ASCII art to stdout, corrupting the JSON-RPC protocol on stdio transport.
|
|
||||||
|
|
||||||
**Fix:**
|
|
||||||
```python
|
|
||||||
def main():
|
|
||||||
# CRITICAL: show_banner=False is required for stdio transport!
|
|
||||||
# FastMCP's banner prints ASCII art to stdout which breaks JSON-RPC protocol
|
|
||||||
app.run(show_banner=False)
|
|
||||||
```
|
|
||||||
|
|
||||||
---
|
|
||||||
|
|
||||||
## 2. Page Range Cap Bug
|
|
||||||
|
|
||||||
**File:** `src/mcp_office_tools/utils/word_processing.py`
|
|
||||||
|
|
||||||
**Symptom:** Requesting pages 1-5 returns truncated content, but pages 195-200 returns everything.
|
|
||||||
|
|
||||||
**Cause:** The paragraph limit was calculated using the *maximum page number* instead of the *count of pages requested*.
|
|
||||||
|
|
||||||
**Before:**
|
|
||||||
```python
|
|
||||||
max_paragraphs = max(page_numbers) * 50 # pages 1-5 = 250 max, pages 195-200 = 10,000 max!
|
|
||||||
```
|
|
||||||
|
|
||||||
**After:**
|
|
||||||
```python
|
|
||||||
num_pages_requested = len(page_numbers) # pages 1-5 = 5, pages 195-200 = 6
|
|
||||||
max_paragraphs = num_pages_requested * 300 # Generous limit per page
|
|
||||||
max_chars = num_pages_requested * 50000
|
|
||||||
```
|
|
||||||
|
|
||||||
---
|
|
||||||
|
|
||||||
## 3. Heading Scan Limit Bug
|
|
||||||
|
|
||||||
**File:** `src/mcp_office_tools/utils/word_processing.py`
|
|
||||||
|
|
||||||
**Symptom:** `_get_available_headings()` returns empty list for documents with chapters beyond the first few pages.
|
|
||||||
|
|
||||||
**Cause:** The function only scanned the first 100 body elements, but Chapter 10 was at element 1524.
|
|
||||||
|
|
||||||
**Before:**
|
|
||||||
```python
|
|
||||||
for element in doc.element.body[:100]: # Only first 100 elements!
|
|
||||||
# find headings...
|
|
||||||
```
|
|
||||||
|
|
||||||
**After:**
|
|
||||||
```python
|
|
||||||
for element in doc.element.body: # Scan ALL elements
|
|
||||||
if len(headings) >= 30:
|
|
||||||
break # Limit output, not search
|
|
||||||
# find headings...
|
|
||||||
```
|
|
||||||
|
|
||||||
---
|
|
||||||
|
|
||||||
## 4. Short-Text Fallback Logic Bug
|
|
||||||
|
|
||||||
**File:** `src/mcp_office_tools/utils/word_processing.py`
|
|
||||||
|
|
||||||
**Symptom:** Chapter search fails even when chapter text exists and is under 100 characters.
|
|
||||||
|
|
||||||
**Cause:** The `elif` for short-text detection was attached to `if style_elem`, meaning it only ran when NO style existed. Paragraphs with any non-heading style (Normal, BodyText, etc.) skipped the fallback entirely.
|
|
||||||
|
|
||||||
**Before:**
|
|
||||||
```python
|
|
||||||
if style_elem:
|
|
||||||
if 'heading' in style_val.lower():
|
|
||||||
chapter_start_idx = elem_idx
|
|
||||||
break
|
|
||||||
elif len(text_content.strip()) < 100: # Only runs if style_elem is empty!
|
|
||||||
chapter_start_idx = elem_idx
|
|
||||||
break
|
|
||||||
```
|
|
||||||
|
|
||||||
**After:**
|
|
||||||
```python
|
|
||||||
is_heading_style = False
|
|
||||||
if style_elem:
|
|
||||||
style_val = style_elem[0].get(...)
|
|
||||||
is_heading_style = 'heading' in style_val.lower()
|
|
||||||
|
|
||||||
# Independent check - runs regardless of whether style exists
|
|
||||||
if is_heading_style or len(text_content.strip()) < 100:
|
|
||||||
chapter_start_idx = elem_idx
|
|
||||||
break
|
|
||||||
```
|
|
||||||
|
|
||||||
---
|
|
||||||
|
|
||||||
## 5. Critical xpath API Mismatch (ROOT CAUSE)
|
|
||||||
|
|
||||||
**File:** `src/mcp_office_tools/utils/word_processing.py`
|
|
||||||
|
|
||||||
**Symptom:** Chapter search always returns "not found" even for chapters that clearly exist.
|
|
||||||
|
|
||||||
**Cause:** python-docx wraps lxml elements with custom classes (`CT_Document`, `CT_Body`, `CT_P`) that override `xpath()` with a **different method signature**. Standard lxml accepts `xpath(expr, namespaces={...})`, but python-docx's version **rejects the `namespaces` keyword argument**.
|
|
||||||
|
|
||||||
All 8 xpath calls were wrapped in try/except blocks, so they **silently failed** - the chapter search never actually executed.
|
|
||||||
|
|
||||||
**Before (silently fails):**
|
|
||||||
```python
|
|
||||||
# These all throw: "BaseOxmlElement.xpath() got an unexpected keyword argument 'namespaces'"
|
|
||||||
text_elems = para.xpath('.//w:t', namespaces={'w': 'http://...'})
|
|
||||||
style_elem = para.xpath('.//w:pStyle', namespaces={'w': 'http://...'})
|
|
||||||
```
|
|
||||||
|
|
||||||
**After (works correctly):**
|
|
||||||
```python
|
|
||||||
from docx.oxml.ns import qn
|
|
||||||
|
|
||||||
# Use findall() with qn() helper for text elements
|
|
||||||
text_elems = para.findall('.//' + qn('w:t'))
|
|
||||||
text_content = ''.join(t.text or '' for t in text_elems)
|
|
||||||
|
|
||||||
# Use find() chain for nested elements (pStyle is inside pPr)
|
|
||||||
pPr = para.find(qn('w:pPr'))
|
|
||||||
if pPr is not None:
|
|
||||||
pStyle = pPr.find(qn('w:pStyle'))
|
|
||||||
if pStyle is not None:
|
|
||||||
style_val = pStyle.get(qn('w:val'), '')
|
|
||||||
```
|
|
||||||
|
|
||||||
**Key Insight:** The `qn()` function from `docx.oxml.ns` converts prefixed names like `'w:t'` to their fully qualified form `'{http://...}t'`, which works with python-docx's element methods.
|
|
||||||
|
|
||||||
---
|
|
||||||
|
|
||||||
## 6. Image Mode Default
|
|
||||||
|
|
||||||
**File:** `src/mcp_office_tools/mixins/word.py`
|
|
||||||
|
|
||||||
**Symptom:** Responses exceed token limits when documents contain images.
|
|
||||||
|
|
||||||
**Cause:** Default `image_mode="base64"` embeds full image data inline, bloating responses.
|
|
||||||
|
|
||||||
**Fix:**
|
|
||||||
```python
|
|
||||||
image_mode: str = Field(
|
|
||||||
default="files", # Changed from "base64"
|
|
||||||
description="Image handling mode: 'files' (saves to disk), 'base64' (embeds inline), 'references' (metadata only)"
|
|
||||||
)
|
|
||||||
```
|
|
||||||
|
|
||||||
---
|
|
||||||
|
|
||||||
## Lessons Learned
|
|
||||||
|
|
||||||
1. **Silent failures are dangerous.** Wrapping xpath calls in try/except hid the API mismatch for months. Consider logging exceptions even when swallowing them.
|
|
||||||
|
|
||||||
2. **Test with real documents.** Unit tests with mocked data passed, but real documents exposed the xpath API issue immediately.
|
|
||||||
|
|
||||||
3. **python-docx is not lxml.** Despite being built on lxml, python-docx's element classes have different method signatures. Always use `qn()` and `findall()`/`find()` instead of `xpath()` with namespace dicts.
|
|
||||||
|
|
||||||
4. **Check your loop bounds.** Scanning "first 100 elements" seemed reasonable but failed for long documents. Limit the *output*, not the *search*.
|
|
||||||
|
|
||||||
5. **Understand your conditionals.** The `if/elif` logic bug is subtle - the fallback was syntactically correct but semantically wrong for the use case.
|
|
||||||
@ -1,18 +1,154 @@
|
|||||||
{
|
{
|
||||||
"metadata": {
|
"metadata": {
|
||||||
"start_time": "2026-01-11T07:15:14.417108",
|
"start_time": "2026-01-11T00:28:31.202459",
|
||||||
|
"end_time": "2026-01-11T00:28:33.718606",
|
||||||
|
"duration": 1.2442383766174316,
|
||||||
|
"exit_status": 0,
|
||||||
"pytest_version": "9.0.2",
|
"pytest_version": "9.0.2",
|
||||||
"end_time": "2026-01-11T07:15:15.173732",
|
"test_types": [
|
||||||
"duration": 0.7566196918487549,
|
"pytest",
|
||||||
"exit_status": 0
|
"torture_test"
|
||||||
|
]
|
||||||
},
|
},
|
||||||
"summary": {
|
"summary": {
|
||||||
"total": 0,
|
"total": 6,
|
||||||
"passed": 0,
|
"passed": 5,
|
||||||
"failed": 0,
|
"failed": 0,
|
||||||
"skipped": 0,
|
"skipped": 1,
|
||||||
"pass_rate": 0
|
"pass_rate": 83.33333333333334
|
||||||
},
|
},
|
||||||
"categories": {},
|
"categories": {
|
||||||
"tests": []
|
"Excel": {
|
||||||
|
"total": 4,
|
||||||
|
"passed": 3,
|
||||||
|
"failed": 0,
|
||||||
|
"skipped": 1
|
||||||
|
},
|
||||||
|
"Word": {
|
||||||
|
"total": 2,
|
||||||
|
"passed": 2,
|
||||||
|
"failed": 0,
|
||||||
|
"skipped": 0
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"tests": [
|
||||||
|
{
|
||||||
|
"name": "Excel Data Analysis",
|
||||||
|
"nodeid": "torture_test.py::test_excel_data_analysis",
|
||||||
|
"category": "Excel",
|
||||||
|
"outcome": "passed",
|
||||||
|
"duration": 0.17873024940490723,
|
||||||
|
"timestamp": "2026-01-11T00:28:33.696485",
|
||||||
|
"module": "torture_test",
|
||||||
|
"class": null,
|
||||||
|
"function": "test_excel_data_analysis",
|
||||||
|
"inputs": {
|
||||||
|
"file": "test_files/test_data.xlsx"
|
||||||
|
},
|
||||||
|
"outputs": {
|
||||||
|
"sheets_analyzed": [
|
||||||
|
"Test Data"
|
||||||
|
]
|
||||||
|
},
|
||||||
|
"error": null,
|
||||||
|
"traceback": null
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "Excel Formula Extraction",
|
||||||
|
"nodeid": "torture_test.py::test_excel_formula_extraction",
|
||||||
|
"category": "Excel",
|
||||||
|
"outcome": "passed",
|
||||||
|
"duration": 0.0032067298889160156,
|
||||||
|
"timestamp": "2026-01-11T00:28:33.699697",
|
||||||
|
"module": "torture_test",
|
||||||
|
"class": null,
|
||||||
|
"function": "test_excel_formula_extraction",
|
||||||
|
"inputs": {
|
||||||
|
"file": "test_files/test_data.xlsx"
|
||||||
|
},
|
||||||
|
"outputs": {
|
||||||
|
"total_formulas": 8
|
||||||
|
},
|
||||||
|
"error": null,
|
||||||
|
"traceback": null
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "Excel Chart Data Generation",
|
||||||
|
"nodeid": "torture_test.py::test_excel_chart_generation",
|
||||||
|
"category": "Excel",
|
||||||
|
"outcome": "passed",
|
||||||
|
"duration": 0.0025446414947509766,
|
||||||
|
"timestamp": "2026-01-11T00:28:33.702246",
|
||||||
|
"module": "torture_test",
|
||||||
|
"class": null,
|
||||||
|
"function": "test_excel_chart_generation",
|
||||||
|
"inputs": {
|
||||||
|
"file": "test_files/test_data.xlsx",
|
||||||
|
"x_column": "Category",
|
||||||
|
"y_columns": [
|
||||||
|
"Value"
|
||||||
|
]
|
||||||
|
},
|
||||||
|
"outputs": {
|
||||||
|
"chart_libraries": 2
|
||||||
|
},
|
||||||
|
"error": null,
|
||||||
|
"traceback": null
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "Word Structure Analysis",
|
||||||
|
"nodeid": "torture_test.py::test_word_structure_analysis",
|
||||||
|
"category": "Word",
|
||||||
|
"outcome": "passed",
|
||||||
|
"duration": 0.010314226150512695,
|
||||||
|
"timestamp": "2026-01-11T00:28:33.712565",
|
||||||
|
"module": "torture_test",
|
||||||
|
"class": null,
|
||||||
|
"function": "test_word_structure_analysis",
|
||||||
|
"inputs": {
|
||||||
|
"file": "test_files/test_document.docx"
|
||||||
|
},
|
||||||
|
"outputs": {
|
||||||
|
"total_headings": 0
|
||||||
|
},
|
||||||
|
"error": null,
|
||||||
|
"traceback": null
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "Word Table Extraction",
|
||||||
|
"nodeid": "torture_test.py::test_word_table_extraction",
|
||||||
|
"category": "Word",
|
||||||
|
"outcome": "passed",
|
||||||
|
"duration": 0.005824089050292969,
|
||||||
|
"timestamp": "2026-01-11T00:28:33.718393",
|
||||||
|
"module": "torture_test",
|
||||||
|
"class": null,
|
||||||
|
"function": "test_word_table_extraction",
|
||||||
|
"inputs": {
|
||||||
|
"file": "test_files/test_document.docx"
|
||||||
|
},
|
||||||
|
"outputs": {
|
||||||
|
"total_tables": 0
|
||||||
|
},
|
||||||
|
"error": null,
|
||||||
|
"traceback": null
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "Real Excel File Analysis (FORScan)",
|
||||||
|
"nodeid": "torture_test.py::test_real_excel_analysis",
|
||||||
|
"category": "Excel",
|
||||||
|
"outcome": "skipped",
|
||||||
|
"duration": 0,
|
||||||
|
"timestamp": "2026-01-11T00:28:33.718405",
|
||||||
|
"module": "torture_test",
|
||||||
|
"class": null,
|
||||||
|
"function": "test_real_excel_analysis",
|
||||||
|
"inputs": {
|
||||||
|
"file": "/home/rpm/FORScan Lite spreadsheets v1.1/FORScan Lite spreadsheet - PIDs.xlsx"
|
||||||
|
},
|
||||||
|
"outputs": null,
|
||||||
|
"error": "File not found: /home/rpm/FORScan Lite spreadsheets v1.1/FORScan Lite spreadsheet - PIDs.xlsx",
|
||||||
|
"traceback": null
|
||||||
|
}
|
||||||
|
]
|
||||||
}
|
}
|
||||||
@ -293,7 +293,7 @@ class UniversalMixin(MCPMixin):
|
|||||||
async def _extract_text_by_category(self, file_path: str, extension: str, category: str, preserve_formatting: bool, method: str) -> dict[str, Any]:
|
async def _extract_text_by_category(self, file_path: str, extension: str, category: str, preserve_formatting: bool, method: str) -> dict[str, Any]:
|
||||||
"""Extract text based on document category."""
|
"""Extract text based on document category."""
|
||||||
# Import the appropriate extraction function
|
# Import the appropriate extraction function
|
||||||
from ..utils import _extract_word_text, _extract_excel_text, _extract_powerpoint_text
|
from ..server_monolithic import _extract_word_text, _extract_excel_text, _extract_powerpoint_text
|
||||||
|
|
||||||
if category == "word":
|
if category == "word":
|
||||||
return await _extract_word_text(file_path, extension, preserve_formatting, method)
|
return await _extract_word_text(file_path, extension, preserve_formatting, method)
|
||||||
@ -306,7 +306,7 @@ class UniversalMixin(MCPMixin):
|
|||||||
|
|
||||||
async def _extract_images_by_category(self, file_path: str, extension: str, category: str, output_format: str, min_width: int, min_height: int) -> list[dict[str, Any]]:
|
async def _extract_images_by_category(self, file_path: str, extension: str, category: str, output_format: str, min_width: int, min_height: int) -> list[dict[str, Any]]:
|
||||||
"""Extract images based on document category."""
|
"""Extract images based on document category."""
|
||||||
from ..utils import _extract_word_images, _extract_excel_images, _extract_powerpoint_images
|
from ..server_monolithic import _extract_word_images, _extract_excel_images, _extract_powerpoint_images
|
||||||
|
|
||||||
if category == "word":
|
if category == "word":
|
||||||
return await _extract_word_images(file_path, extension, output_format, min_width, min_height)
|
return await _extract_word_images(file_path, extension, output_format, min_width, min_height)
|
||||||
@ -319,7 +319,7 @@ class UniversalMixin(MCPMixin):
|
|||||||
|
|
||||||
async def _extract_metadata_by_category(self, file_path: str, extension: str, category: str) -> dict[str, Any]:
|
async def _extract_metadata_by_category(self, file_path: str, extension: str, category: str) -> dict[str, Any]:
|
||||||
"""Extract metadata based on document category."""
|
"""Extract metadata based on document category."""
|
||||||
from ..utils import _extract_word_metadata, _extract_excel_metadata, _extract_powerpoint_metadata, _extract_basic_metadata
|
from ..server_monolithic import _extract_word_metadata, _extract_excel_metadata, _extract_powerpoint_metadata, _extract_basic_metadata
|
||||||
|
|
||||||
# Get basic metadata first
|
# Get basic metadata first
|
||||||
metadata = await _extract_basic_metadata(file_path, extension, category)
|
metadata = await _extract_basic_metadata(file_path, extension, category)
|
||||||
@ -339,5 +339,5 @@ class UniversalMixin(MCPMixin):
|
|||||||
|
|
||||||
async def _extract_basic_metadata(self, file_path: str, extension: str, category: str) -> dict[str, Any]:
|
async def _extract_basic_metadata(self, file_path: str, extension: str, category: str) -> dict[str, Any]:
|
||||||
"""Extract basic metadata common to all documents."""
|
"""Extract basic metadata common to all documents."""
|
||||||
from ..utils import _extract_basic_metadata
|
from ..server_monolithic import _extract_basic_metadata
|
||||||
return await _extract_basic_metadata(file_path, extension, category)
|
return await _extract_basic_metadata(file_path, extension, category)
|
||||||
@ -44,15 +44,15 @@ class WordMixin(MCPMixin):
|
|||||||
async def convert_to_markdown(
|
async def convert_to_markdown(
|
||||||
self,
|
self,
|
||||||
file_path: str = Field(description="Path to Office document or URL"),
|
file_path: str = Field(description="Path to Office document or URL"),
|
||||||
include_images: bool = Field(default=True, description="Include images in markdown output. When True, images are extracted to files and linked in the markdown."),
|
include_images: bool = Field(default=True, description="Include images in markdown with base64 encoding or file references"),
|
||||||
image_mode: str = Field(default="files", description="Image handling mode: 'files' (default, saves to disk and links), 'base64' (embeds inline - WARNING: can create massive responses), or 'references' (metadata only, no content)"),
|
image_mode: str = Field(default="base64", description="Image handling mode: 'base64', 'files', or 'references'"),
|
||||||
max_image_size: int = Field(default=1024*1024, description="Maximum image size in bytes for base64 encoding (only used when image_mode='base64')"),
|
max_image_size: int = Field(default=1024*1024, description="Maximum image size in bytes for base64 encoding"),
|
||||||
preserve_structure: bool = Field(default=True, description="Preserve document structure (headings, lists, tables)"),
|
preserve_structure: bool = Field(default=True, description="Preserve document structure (headings, lists, tables)"),
|
||||||
page_range: str = Field(default="", description="Page range to convert (e.g., '1-5', '3', '1,3,5-10'). RECOMMENDED for large documents. Empty = all pages"),
|
page_range: str = Field(default="", description="Page range to convert (e.g., '1-5', '3', '1,3,5-10'). RECOMMENDED for large documents. Empty = all pages"),
|
||||||
bookmark_name: str = Field(default="", description="Extract content for a specific bookmark/chapter (e.g., 'Chapter1_Start'). More reliable than page ranges."),
|
bookmark_name: str = Field(default="", description="Extract content for a specific bookmark/chapter (e.g., 'Chapter1_Start'). More reliable than page ranges."),
|
||||||
chapter_name: str = Field(default="", description="Extract content for a chapter by heading text (e.g., 'Chapter 1', 'Introduction'). Works when bookmarks aren't available."),
|
chapter_name: str = Field(default="", description="Extract content for a chapter by heading text (e.g., 'Chapter 1', 'Introduction'). Works when bookmarks aren't available."),
|
||||||
summary_only: bool = Field(default=False, description="Return only metadata and truncated summary. STRONGLY RECOMMENDED for large docs (>10 pages)"),
|
summary_only: bool = Field(default=False, description="Return only metadata and truncated summary. STRONGLY RECOMMENDED for large docs (>10 pages)"),
|
||||||
output_dir: str = Field(default="", description="Output directory for extracted image files. If empty, uses a temp directory based on document name."),
|
output_dir: str = Field(default="", description="Output directory for image files (if image_mode='files')"),
|
||||||
# Pagination parameters
|
# Pagination parameters
|
||||||
limit: int = Field(default=50, description="Maximum number of document sections to return per page"),
|
limit: int = Field(default=50, description="Maximum number of document sections to return per page"),
|
||||||
cursor_id: Optional[str] = Field(default=None, description="Cursor ID for pagination continuation"),
|
cursor_id: Optional[str] = Field(default=None, description="Cursor ID for pagination continuation"),
|
||||||
@ -225,17 +225,17 @@ class WordMixin(MCPMixin):
|
|||||||
# Helper methods - import from monolithic server
|
# Helper methods - import from monolithic server
|
||||||
async def _analyze_document_size(self, file_path: str, extension: str) -> dict[str, Any]:
|
async def _analyze_document_size(self, file_path: str, extension: str) -> dict[str, Any]:
|
||||||
"""Analyze document size for processing recommendations."""
|
"""Analyze document size for processing recommendations."""
|
||||||
from ..utils import _analyze_document_size
|
from ..server_monolithic import _analyze_document_size
|
||||||
return await _analyze_document_size(file_path, extension)
|
return await _analyze_document_size(file_path, extension)
|
||||||
|
|
||||||
def _get_processing_recommendation(self, doc_analysis: dict[str, Any], page_range: str, summary_only: bool) -> dict[str, Any]:
|
def _get_processing_recommendation(self, doc_analysis: dict[str, Any], page_range: str, summary_only: bool) -> dict[str, Any]:
|
||||||
"""Get processing recommendations based on document analysis."""
|
"""Get processing recommendations based on document analysis."""
|
||||||
from ..utils import _get_processing_recommendation
|
from ..server_monolithic import _get_processing_recommendation
|
||||||
return _get_processing_recommendation(doc_analysis, page_range, summary_only)
|
return _get_processing_recommendation(doc_analysis, page_range, summary_only)
|
||||||
|
|
||||||
def _parse_page_range(self, page_range: str) -> list[int]:
|
def _parse_page_range(self, page_range: str) -> list[int]:
|
||||||
"""Parse page range string into list of page numbers."""
|
"""Parse page range string into list of page numbers."""
|
||||||
from ..utils import _parse_page_range
|
from ..server_monolithic import _parse_page_range
|
||||||
return _parse_page_range(page_range)
|
return _parse_page_range(page_range)
|
||||||
|
|
||||||
async def _convert_docx_to_markdown(
|
async def _convert_docx_to_markdown(
|
||||||
@ -244,7 +244,7 @@ class WordMixin(MCPMixin):
|
|||||||
bookmark_name: str = "", chapter_name: str = ""
|
bookmark_name: str = "", chapter_name: str = ""
|
||||||
) -> dict[str, Any]:
|
) -> dict[str, Any]:
|
||||||
"""Convert .docx to markdown."""
|
"""Convert .docx to markdown."""
|
||||||
from ..utils import _convert_docx_to_markdown
|
from ..server_monolithic import _convert_docx_to_markdown
|
||||||
return await _convert_docx_to_markdown(
|
return await _convert_docx_to_markdown(
|
||||||
file_path, include_images, image_mode, max_image_size,
|
file_path, include_images, image_mode, max_image_size,
|
||||||
preserve_structure, page_numbers, summary_only, output_dir, bookmark_name, chapter_name
|
preserve_structure, page_numbers, summary_only, output_dir, bookmark_name, chapter_name
|
||||||
@ -255,7 +255,7 @@ class WordMixin(MCPMixin):
|
|||||||
preserve_structure: bool, page_numbers: list[int], summary_only: bool, output_dir: str
|
preserve_structure: bool, page_numbers: list[int], summary_only: bool, output_dir: str
|
||||||
) -> dict[str, Any]:
|
) -> dict[str, Any]:
|
||||||
"""Convert legacy .doc to markdown."""
|
"""Convert legacy .doc to markdown."""
|
||||||
from ..utils import _convert_doc_to_markdown
|
from ..server_monolithic import _convert_doc_to_markdown
|
||||||
return await _convert_doc_to_markdown(
|
return await _convert_doc_to_markdown(
|
||||||
file_path, include_images, image_mode, max_image_size,
|
file_path, include_images, image_mode, max_image_size,
|
||||||
preserve_structure, page_numbers, summary_only, output_dir
|
preserve_structure, page_numbers, summary_only, output_dir
|
||||||
@ -635,802 +635,3 @@ class WordMixin(MCPMixin):
|
|||||||
stack.append(node)
|
stack.append(node)
|
||||||
|
|
||||||
return tree
|
return tree
|
||||||
|
|
||||||
# ==================== New Document Navigation Tools ====================
|
|
||||||
|
|
||||||
@mcp_tool(
|
|
||||||
name="get_document_outline",
|
|
||||||
description="Get a clean, structured outline of a Word document showing all headings, sections, and chapters with their locations. Perfect for understanding document structure before reading."
|
|
||||||
)
|
|
||||||
@handle_office_errors("Document outline")
|
|
||||||
async def get_document_outline(
|
|
||||||
self,
|
|
||||||
file_path: str = Field(description="Path to Word document or URL"),
|
|
||||||
include_word_counts: bool = Field(default=True, description="Include estimated word count per section"),
|
|
||||||
detect_chapters: bool = Field(default=True, description="Detect and flag chapter headings specifically")
|
|
||||||
) -> dict[str, Any]:
|
|
||||||
"""Extract structured document outline with chapter detection."""
|
|
||||||
from docx import Document
|
|
||||||
from docx.oxml.ns import qn
|
|
||||||
|
|
||||||
start_time = time.time()
|
|
||||||
local_path = await resolve_office_file_path(file_path)
|
|
||||||
|
|
||||||
validation = await validate_office_file(local_path)
|
|
||||||
if not validation["is_valid"]:
|
|
||||||
raise OfficeFileError(f"Invalid file: {', '.join(validation['errors'])}")
|
|
||||||
|
|
||||||
doc = Document(local_path)
|
|
||||||
|
|
||||||
outline = []
|
|
||||||
current_section = None
|
|
||||||
section_word_count = 0
|
|
||||||
total_words = 0
|
|
||||||
chapter_pattern = ["chapter", "section", "part", "introduction", "conclusion", "appendix", "preface", "epilogue"]
|
|
||||||
|
|
||||||
for para_idx, para in enumerate(doc.paragraphs):
|
|
||||||
text = para.text.strip()
|
|
||||||
word_count = len(text.split()) if text else 0
|
|
||||||
total_words += word_count
|
|
||||||
|
|
||||||
# Check if this is a heading
|
|
||||||
style_name = para.style.name.lower() if para.style else ""
|
|
||||||
is_heading = "heading" in style_name or "title" in style_name
|
|
||||||
|
|
||||||
# Determine heading level
|
|
||||||
level = 0
|
|
||||||
if is_heading:
|
|
||||||
if "title" in style_name:
|
|
||||||
level = 0
|
|
||||||
elif "heading 1" in style_name or style_name == "heading1":
|
|
||||||
level = 1
|
|
||||||
elif "heading 2" in style_name or style_name == "heading2":
|
|
||||||
level = 2
|
|
||||||
elif "heading 3" in style_name or style_name == "heading3":
|
|
||||||
level = 3
|
|
||||||
elif "heading" in style_name:
|
|
||||||
# Try to extract number from style name
|
|
||||||
import re
|
|
||||||
match = re.search(r'heading\s*(\d+)', style_name)
|
|
||||||
level = int(match.group(1)) if match else 4
|
|
||||||
|
|
||||||
if is_heading and text:
|
|
||||||
# Save previous section's word count
|
|
||||||
if current_section is not None and include_word_counts:
|
|
||||||
current_section["word_count"] = section_word_count
|
|
||||||
|
|
||||||
# Detect if this is a chapter
|
|
||||||
is_chapter = False
|
|
||||||
chapter_number = None
|
|
||||||
if detect_chapters:
|
|
||||||
text_lower = text.lower()
|
|
||||||
for pattern in chapter_pattern:
|
|
||||||
if pattern in text_lower:
|
|
||||||
is_chapter = True
|
|
||||||
# Try to extract chapter number
|
|
||||||
import re
|
|
||||||
match = re.search(r'(?:chapter|section|part)\s*(\d+)', text_lower)
|
|
||||||
if match:
|
|
||||||
chapter_number = int(match.group(1))
|
|
||||||
break
|
|
||||||
|
|
||||||
current_section = {
|
|
||||||
"text": text[:150] + ("..." if len(text) > 150 else ""),
|
|
||||||
"level": level,
|
|
||||||
"style": para.style.name if para.style else "Unknown",
|
|
||||||
"paragraph_index": para_idx,
|
|
||||||
"is_chapter": is_chapter
|
|
||||||
}
|
|
||||||
|
|
||||||
if chapter_number is not None:
|
|
||||||
current_section["chapter_number"] = chapter_number
|
|
||||||
|
|
||||||
outline.append(current_section)
|
|
||||||
section_word_count = 0
|
|
||||||
else:
|
|
||||||
section_word_count += word_count
|
|
||||||
|
|
||||||
# Don't forget last section
|
|
||||||
if current_section is not None and include_word_counts:
|
|
||||||
current_section["word_count"] = section_word_count
|
|
||||||
|
|
||||||
# Build summary statistics
|
|
||||||
chapters = [item for item in outline if item.get("is_chapter")]
|
|
||||||
chapter_numbers = [c.get("chapter_number") for c in chapters if c.get("chapter_number")]
|
|
||||||
|
|
||||||
# Detect missing chapters
|
|
||||||
missing_chapters = []
|
|
||||||
if chapter_numbers:
|
|
||||||
expected = set(range(1, max(chapter_numbers) + 1))
|
|
||||||
found = set(chapter_numbers)
|
|
||||||
missing_chapters = sorted(expected - found)
|
|
||||||
|
|
||||||
return {
|
|
||||||
"outline": outline,
|
|
||||||
"summary": {
|
|
||||||
"total_headings": len(outline),
|
|
||||||
"chapters_found": len(chapters),
|
|
||||||
"chapter_numbers": chapter_numbers,
|
|
||||||
"missing_chapters": missing_chapters,
|
|
||||||
"total_words": total_words,
|
|
||||||
"total_paragraphs": len(doc.paragraphs)
|
|
||||||
},
|
|
||||||
"extraction_time": round(time.time() - start_time, 3)
|
|
||||||
}
|
|
||||||
|
|
||||||
@mcp_tool(
|
|
||||||
name="check_style_consistency",
|
|
||||||
description="Analyze a Word document for style inconsistencies, formatting issues, and potential problems like mismatched heading styles or missing chapters."
|
|
||||||
)
|
|
||||||
@handle_office_errors("Style consistency check")
|
|
||||||
async def check_style_consistency(
|
|
||||||
self,
|
|
||||||
file_path: str = Field(description="Path to Word document or URL")
|
|
||||||
) -> dict[str, Any]:
|
|
||||||
"""Check document for style and formatting consistency issues."""
|
|
||||||
from docx import Document
|
|
||||||
|
|
||||||
start_time = time.time()
|
|
||||||
local_path = await resolve_office_file_path(file_path)
|
|
||||||
|
|
||||||
validation = await validate_office_file(local_path)
|
|
||||||
if not validation["is_valid"]:
|
|
||||||
raise OfficeFileError(f"Invalid file: {', '.join(validation['errors'])}")
|
|
||||||
|
|
||||||
doc = Document(local_path)
|
|
||||||
|
|
||||||
issues = []
|
|
||||||
warnings = []
|
|
||||||
|
|
||||||
# Track heading styles and chapter detection
|
|
||||||
heading_styles = {}
|
|
||||||
chapters_by_style = {"heading": [], "other": []}
|
|
||||||
chapter_numbers_found = []
|
|
||||||
|
|
||||||
import re
|
|
||||||
chapter_pattern = re.compile(r'^chapter\s*(\d+)', re.IGNORECASE)
|
|
||||||
|
|
||||||
for para_idx, para in enumerate(doc.paragraphs):
|
|
||||||
text = para.text.strip()
|
|
||||||
style_name = para.style.name if para.style else "None"
|
|
||||||
style_lower = style_name.lower()
|
|
||||||
|
|
||||||
# Track style usage
|
|
||||||
heading_styles[style_name] = heading_styles.get(style_name, 0) + 1
|
|
||||||
|
|
||||||
# Check for chapter-like text
|
|
||||||
chapter_match = chapter_pattern.match(text)
|
|
||||||
if chapter_match:
|
|
||||||
chapter_num = int(chapter_match.group(1))
|
|
||||||
chapter_numbers_found.append(chapter_num)
|
|
||||||
|
|
||||||
is_heading_style = "heading" in style_lower
|
|
||||||
|
|
||||||
if is_heading_style:
|
|
||||||
chapters_by_style["heading"].append({
|
|
||||||
"chapter": chapter_num,
|
|
||||||
"text": text[:80],
|
|
||||||
"style": style_name,
|
|
||||||
"paragraph": para_idx
|
|
||||||
})
|
|
||||||
else:
|
|
||||||
chapters_by_style["other"].append({
|
|
||||||
"chapter": chapter_num,
|
|
||||||
"text": text[:80],
|
|
||||||
"style": style_name,
|
|
||||||
"paragraph": para_idx
|
|
||||||
})
|
|
||||||
issues.append({
|
|
||||||
"type": "inconsistent_chapter_style",
|
|
||||||
"severity": "warning",
|
|
||||||
"message": f"Chapter {chapter_num} uses '{style_name}' instead of a Heading style",
|
|
||||||
"paragraph": para_idx,
|
|
||||||
"text": text[:80]
|
|
||||||
})
|
|
||||||
|
|
||||||
# Check for potential headings that aren't styled as headings
|
|
||||||
if text and len(text) < 100 and not text.endswith('.'):
|
|
||||||
is_heading_style = "heading" in style_lower or "title" in style_lower
|
|
||||||
looks_like_heading = any(word in text.lower() for word in
|
|
||||||
["chapter", "section", "part", "introduction", "conclusion", "appendix"])
|
|
||||||
|
|
||||||
if looks_like_heading and not is_heading_style:
|
|
||||||
warnings.append({
|
|
||||||
"type": "potential_heading_not_styled",
|
|
||||||
"message": f"Text looks like a heading but uses '{style_name}' style",
|
|
||||||
"paragraph": para_idx,
|
|
||||||
"text": text[:80]
|
|
||||||
})
|
|
||||||
|
|
||||||
# Check for missing chapters in sequence
|
|
||||||
missing_chapters = []
|
|
||||||
if chapter_numbers_found:
|
|
||||||
chapter_numbers_found.sort()
|
|
||||||
expected = set(range(1, max(chapter_numbers_found) + 1))
|
|
||||||
found = set(chapter_numbers_found)
|
|
||||||
missing_chapters = sorted(expected - found)
|
|
||||||
|
|
||||||
for missing in missing_chapters:
|
|
||||||
issues.append({
|
|
||||||
"type": "missing_chapter",
|
|
||||||
"severity": "error",
|
|
||||||
"message": f"Chapter {missing} appears to be missing from sequence",
|
|
||||||
"expected_between": f"Chapter {missing-1} and Chapter {missing+1}" if missing > 1 else f"Before Chapter {missing+1}"
|
|
||||||
})
|
|
||||||
|
|
||||||
# Check for duplicate chapter numbers
|
|
||||||
from collections import Counter
|
|
||||||
chapter_counts = Counter(chapter_numbers_found)
|
|
||||||
duplicates = {num: count for num, count in chapter_counts.items() if count > 1}
|
|
||||||
for chapter_num, count in duplicates.items():
|
|
||||||
issues.append({
|
|
||||||
"type": "duplicate_chapter",
|
|
||||||
"severity": "warning",
|
|
||||||
"message": f"Chapter {chapter_num} appears {count} times"
|
|
||||||
})
|
|
||||||
|
|
||||||
# Summary of heading style usage
|
|
||||||
heading_summary = {k: v for k, v in heading_styles.items()
|
|
||||||
if "heading" in k.lower() or "title" in k.lower()}
|
|
||||||
|
|
||||||
return {
|
|
||||||
"issues": issues,
|
|
||||||
"warnings": warnings,
|
|
||||||
"chapter_analysis": {
|
|
||||||
"total_chapters": len(chapter_numbers_found),
|
|
||||||
"chapters_with_heading_style": len(chapters_by_style["heading"]),
|
|
||||||
"chapters_without_heading_style": len(chapters_by_style["other"]),
|
|
||||||
"missing_chapters": missing_chapters,
|
|
||||||
"duplicate_chapters": list(duplicates.keys()),
|
|
||||||
"chapter_details": chapters_by_style
|
|
||||||
},
|
|
||||||
"style_usage": heading_summary,
|
|
||||||
"health_score": self._calculate_doc_health_score(issues, warnings),
|
|
||||||
"analysis_time": round(time.time() - start_time, 3)
|
|
||||||
}
|
|
||||||
|
|
||||||
def _calculate_doc_health_score(self, issues: list, warnings: list) -> dict:
|
|
||||||
"""Calculate document health score based on issues found."""
|
|
||||||
score = 100
|
|
||||||
|
|
||||||
for issue in issues:
|
|
||||||
if issue.get("severity") == "error":
|
|
||||||
score -= 10
|
|
||||||
elif issue.get("severity") == "warning":
|
|
||||||
score -= 5
|
|
||||||
|
|
||||||
for _ in warnings:
|
|
||||||
score -= 2
|
|
||||||
|
|
||||||
score = max(0, min(100, score))
|
|
||||||
|
|
||||||
if score >= 90:
|
|
||||||
rating = "excellent"
|
|
||||||
elif score >= 70:
|
|
||||||
rating = "good"
|
|
||||||
elif score >= 50:
|
|
||||||
rating = "fair"
|
|
||||||
else:
|
|
||||||
rating = "needs attention"
|
|
||||||
|
|
||||||
return {"score": score, "rating": rating}
|
|
||||||
|
|
||||||
@mcp_tool(
|
|
||||||
name="search_document",
|
|
||||||
description="Search for text within a Word document and return matches with surrounding context and location information."
|
|
||||||
)
|
|
||||||
@handle_office_errors("Document search")
|
|
||||||
async def search_document(
|
|
||||||
self,
|
|
||||||
file_path: str = Field(description="Path to Word document or URL"),
|
|
||||||
query: str = Field(description="Text to search for (case-insensitive)"),
|
|
||||||
context_chars: int = Field(default=100, description="Number of characters of context before and after match"),
|
|
||||||
max_results: int = Field(default=20, description="Maximum number of results to return")
|
|
||||||
) -> dict[str, Any]:
|
|
||||||
"""Search document for text with context."""
|
|
||||||
from docx import Document
|
|
||||||
|
|
||||||
start_time = time.time()
|
|
||||||
local_path = await resolve_office_file_path(file_path)
|
|
||||||
|
|
||||||
validation = await validate_office_file(local_path)
|
|
||||||
if not validation["is_valid"]:
|
|
||||||
raise OfficeFileError(f"Invalid file: {', '.join(validation['errors'])}")
|
|
||||||
|
|
||||||
doc = Document(local_path)
|
|
||||||
query_lower = query.lower()
|
|
||||||
|
|
||||||
results = []
|
|
||||||
current_chapter = None
|
|
||||||
current_section = None
|
|
||||||
|
|
||||||
for para_idx, para in enumerate(doc.paragraphs):
|
|
||||||
text = para.text
|
|
||||||
style_name = para.style.name if para.style else ""
|
|
||||||
style_lower = style_name.lower()
|
|
||||||
|
|
||||||
# Track current chapter/section for context
|
|
||||||
if "heading" in style_lower or "title" in style_lower:
|
|
||||||
if "1" in style_name or "title" in style_lower:
|
|
||||||
current_chapter = text.strip()[:80]
|
|
||||||
current_section = None
|
|
||||||
else:
|
|
||||||
current_section = text.strip()[:80]
|
|
||||||
|
|
||||||
# Search for matches
|
|
||||||
text_lower = text.lower()
|
|
||||||
search_start = 0
|
|
||||||
|
|
||||||
while True:
|
|
||||||
pos = text_lower.find(query_lower, search_start)
|
|
||||||
if pos == -1:
|
|
||||||
break
|
|
||||||
|
|
||||||
if len(results) >= max_results:
|
|
||||||
break
|
|
||||||
|
|
||||||
# Extract context
|
|
||||||
context_start = max(0, pos - context_chars)
|
|
||||||
context_end = min(len(text), pos + len(query) + context_chars)
|
|
||||||
|
|
||||||
context = text[context_start:context_end]
|
|
||||||
if context_start > 0:
|
|
||||||
context = "..." + context
|
|
||||||
if context_end < len(text):
|
|
||||||
context = context + "..."
|
|
||||||
|
|
||||||
results.append({
|
|
||||||
"paragraph_index": para_idx,
|
|
||||||
"position": pos,
|
|
||||||
"context": context,
|
|
||||||
"chapter": current_chapter,
|
|
||||||
"section": current_section,
|
|
||||||
"style": style_name
|
|
||||||
})
|
|
||||||
|
|
||||||
search_start = pos + 1
|
|
||||||
|
|
||||||
if len(results) >= max_results:
|
|
||||||
break
|
|
||||||
|
|
||||||
return {
|
|
||||||
"query": query,
|
|
||||||
"total_matches": len(results),
|
|
||||||
"results": results,
|
|
||||||
"search_time": round(time.time() - start_time, 3),
|
|
||||||
"truncated": len(results) >= max_results
|
|
||||||
}
|
|
||||||
|
|
||||||
@mcp_tool(
|
|
||||||
name="extract_entities",
|
|
||||||
description="Extract named entities (people, places, organizations) from a Word document using pattern-based recognition. Great for identifying key characters, locations, and institutions mentioned in the text."
|
|
||||||
)
|
|
||||||
@handle_office_errors("Entity extraction")
|
|
||||||
async def extract_entities(
|
|
||||||
self,
|
|
||||||
file_path: str = Field(description="Path to Word document or URL"),
|
|
||||||
entity_types: str = Field(default="all", description="Entity types to extract: 'all', 'people', 'places', 'organizations', or comma-separated combination"),
|
|
||||||
min_occurrences: int = Field(default=1, description="Minimum occurrences for an entity to be included"),
|
|
||||||
include_context: bool = Field(default=True, description="Include sample context for each entity")
|
|
||||||
) -> dict[str, Any]:
|
|
||||||
"""Extract named entities from document using pattern-based recognition."""
|
|
||||||
from docx import Document
|
|
||||||
from collections import defaultdict
|
|
||||||
import re
|
|
||||||
|
|
||||||
start_time = time.time()
|
|
||||||
local_path = await resolve_office_file_path(file_path)
|
|
||||||
|
|
||||||
validation = await validate_office_file(local_path)
|
|
||||||
if not validation["is_valid"]:
|
|
||||||
raise OfficeFileError(f"Invalid file: {', '.join(validation['errors'])}")
|
|
||||||
|
|
||||||
doc = Document(local_path)
|
|
||||||
|
|
||||||
# Parse entity types to extract
|
|
||||||
if entity_types == "all":
|
|
||||||
extract_types = {"people", "places", "organizations"}
|
|
||||||
else:
|
|
||||||
extract_types = set(t.strip().lower() for t in entity_types.split(","))
|
|
||||||
|
|
||||||
# Entity containers with context tracking
|
|
||||||
entities = {
|
|
||||||
"people": defaultdict(lambda: {"count": 0, "contexts": []}),
|
|
||||||
"places": defaultdict(lambda: {"count": 0, "contexts": []}),
|
|
||||||
"organizations": defaultdict(lambda: {"count": 0, "contexts": []})
|
|
||||||
}
|
|
||||||
|
|
||||||
# Patterns for entity detection
|
|
||||||
# Titles indicating people
|
|
||||||
title_pattern = re.compile(
|
|
||||||
r'\b(Dr\.?|Mr\.?|Mrs\.?|Ms\.?|Miss|Professor|Prof\.?|Sister|Father|Rev\.?|'
|
|
||||||
r'President|Director|Nurse|RN|LPN|MD)\s+([A-Z][a-z]+(?:\s+[A-Z][a-z]+)?)',
|
|
||||||
re.IGNORECASE
|
|
||||||
)
|
|
||||||
|
|
||||||
# Organization patterns
|
|
||||||
org_suffixes = re.compile(
|
|
||||||
r'\b([A-Z][a-zA-Z\s\'\-]+(?:Hospital|Medical Center|Center|Clinic|University|'
|
|
||||||
r'College|School|Association|Institute|Foundation|Department|Administration|'
|
|
||||||
r'Committee|Board|Agency|Service|Company|Inc|Corp|LLC|VA|ANA))\b'
|
|
||||||
)
|
|
||||||
|
|
||||||
# Place patterns (cities, states, geographic locations)
|
|
||||||
place_patterns = re.compile(
|
|
||||||
r'\b([A-Z][a-z]+(?:\s+[A-Z][a-z]+)*),\s*((?:[A-Z]{2}|[A-Z][a-z]+(?:\s+[A-Z][a-z]+)*))\b|'
|
|
||||||
r'\b((?:North|South|East|West)\s+[A-Z][a-z]+)\b|'
|
|
||||||
r'\b([A-Z][a-z]+(?:\s+[A-Z][a-z]+)*)\s+(?:City|County|State|Valley|Mountain|River|Lake|Island)\b'
|
|
||||||
)
|
|
||||||
|
|
||||||
# Known US states for validation
|
|
||||||
us_states = {
|
|
||||||
'Alabama', 'Alaska', 'Arizona', 'Arkansas', 'California', 'Colorado',
|
|
||||||
'Connecticut', 'Delaware', 'Florida', 'Georgia', 'Hawaii', 'Idaho',
|
|
||||||
'Illinois', 'Indiana', 'Iowa', 'Kansas', 'Kentucky', 'Louisiana',
|
|
||||||
'Maine', 'Maryland', 'Massachusetts', 'Michigan', 'Minnesota',
|
|
||||||
'Mississippi', 'Missouri', 'Montana', 'Nebraska', 'Nevada',
|
|
||||||
'New Hampshire', 'New Jersey', 'New Mexico', 'New York',
|
|
||||||
'North Carolina', 'North Dakota', 'Ohio', 'Oklahoma', 'Oregon',
|
|
||||||
'Pennsylvania', 'Rhode Island', 'South Carolina', 'South Dakota',
|
|
||||||
'Tennessee', 'Texas', 'Utah', 'Vermont', 'Virginia', 'Washington',
|
|
||||||
'West Virginia', 'Wisconsin', 'Wyoming', 'DC', 'ID', 'WA', 'NY',
|
|
||||||
'CA', 'ND', 'MN', 'IA', 'MT', 'OR', 'NV', 'AZ', 'NM', 'CO', 'WY'
|
|
||||||
}
|
|
||||||
|
|
||||||
# Common first names for better people detection
|
|
||||||
common_titles = {'dr', 'mr', 'mrs', 'ms', 'miss', 'professor', 'prof',
|
|
||||||
'sister', 'father', 'rev', 'president', 'director', 'nurse'}
|
|
||||||
|
|
||||||
current_chapter = "Document Start"
|
|
||||||
|
|
||||||
for para_idx, para in enumerate(doc.paragraphs):
|
|
||||||
text = para.text
|
|
||||||
style_name = para.style.name if para.style else ""
|
|
||||||
|
|
||||||
# Track chapters for context
|
|
||||||
if "heading" in style_name.lower() and "1" in style_name:
|
|
||||||
current_chapter = text.strip()[:60]
|
|
||||||
|
|
||||||
# Skip very short paragraphs
|
|
||||||
if len(text) < 10:
|
|
||||||
continue
|
|
||||||
|
|
||||||
# Extract people
|
|
||||||
if "people" in extract_types:
|
|
||||||
for match in title_pattern.finditer(text):
|
|
||||||
title = match.group(1)
|
|
||||||
name = match.group(2).strip()
|
|
||||||
full_name = f"{title} {name}".strip()
|
|
||||||
|
|
||||||
# Clean up the name
|
|
||||||
if len(name) >= 2:
|
|
||||||
entities["people"][full_name]["count"] += 1
|
|
||||||
if include_context and len(entities["people"][full_name]["contexts"]) < 3:
|
|
||||||
# Get surrounding context
|
|
||||||
start = max(0, match.start() - 30)
|
|
||||||
end = min(len(text), match.end() + 50)
|
|
||||||
context = text[start:end].strip()
|
|
||||||
entities["people"][full_name]["contexts"].append({
|
|
||||||
"text": f"...{context}...",
|
|
||||||
"chapter": current_chapter,
|
|
||||||
"paragraph": para_idx
|
|
||||||
})
|
|
||||||
|
|
||||||
# Also look for standalone capitalized names after verbs
|
|
||||||
name_after_verb = re.finditer(
|
|
||||||
r'\b(?:said|told|asked|replied|answered|explained|noted|added|mentioned)\s+'
|
|
||||||
r'([A-Z][a-z]+(?:\s+[A-Z][a-z]+)?)\b',
|
|
||||||
text
|
|
||||||
)
|
|
||||||
for match in name_after_verb:
|
|
||||||
name = match.group(1).strip()
|
|
||||||
if len(name) >= 3 and name not in us_states:
|
|
||||||
entities["people"][name]["count"] += 1
|
|
||||||
if include_context and len(entities["people"][name]["contexts"]) < 3:
|
|
||||||
start = max(0, match.start() - 20)
|
|
||||||
end = min(len(text), match.end() + 40)
|
|
||||||
context = text[start:end].strip()
|
|
||||||
entities["people"][name]["contexts"].append({
|
|
||||||
"text": f"...{context}...",
|
|
||||||
"chapter": current_chapter,
|
|
||||||
"paragraph": para_idx
|
|
||||||
})
|
|
||||||
|
|
||||||
# Extract organizations
|
|
||||||
if "organizations" in extract_types:
|
|
||||||
for match in org_suffixes.finditer(text):
|
|
||||||
org_name = match.group(1).strip()
|
|
||||||
if len(org_name) >= 5:
|
|
||||||
entities["organizations"][org_name]["count"] += 1
|
|
||||||
if include_context and len(entities["organizations"][org_name]["contexts"]) < 3:
|
|
||||||
start = max(0, match.start() - 20)
|
|
||||||
end = min(len(text), match.end() + 40)
|
|
||||||
context = text[start:end].strip()
|
|
||||||
entities["organizations"][org_name]["contexts"].append({
|
|
||||||
"text": f"...{context}...",
|
|
||||||
"chapter": current_chapter,
|
|
||||||
"paragraph": para_idx
|
|
||||||
})
|
|
||||||
|
|
||||||
# Extract places
|
|
||||||
if "places" in extract_types:
|
|
||||||
for match in place_patterns.finditer(text):
|
|
||||||
# Try different capture groups
|
|
||||||
place = None
|
|
||||||
if match.group(1) and match.group(2): # City, State pattern
|
|
||||||
city = match.group(1).strip()
|
|
||||||
state = match.group(2).strip()
|
|
||||||
if state in us_states or len(state) == 2:
|
|
||||||
place = f"{city}, {state}"
|
|
||||||
elif match.group(3): # Directional places
|
|
||||||
place = match.group(3).strip()
|
|
||||||
elif match.group(4): # Geographic features
|
|
||||||
place = match.group(4).strip()
|
|
||||||
|
|
||||||
if place and len(place) >= 3:
|
|
||||||
entities["places"][place]["count"] += 1
|
|
||||||
if include_context and len(entities["places"][place]["contexts"]) < 3:
|
|
||||||
start = max(0, match.start() - 20)
|
|
||||||
end = min(len(text), match.end() + 40)
|
|
||||||
context = text[start:end].strip()
|
|
||||||
entities["places"][place]["contexts"].append({
|
|
||||||
"text": f"...{context}...",
|
|
||||||
"chapter": current_chapter,
|
|
||||||
"paragraph": para_idx
|
|
||||||
})
|
|
||||||
|
|
||||||
# Filter by minimum occurrences and prepare output
|
|
||||||
def filter_and_sort(entity_dict, min_count):
|
|
||||||
filtered = []
|
|
||||||
for name, data in entity_dict.items():
|
|
||||||
if data["count"] >= min_count:
|
|
||||||
entry = {
|
|
||||||
"name": name,
|
|
||||||
"occurrences": data["count"]
|
|
||||||
}
|
|
||||||
if include_context and data["contexts"]:
|
|
||||||
entry["sample_contexts"] = data["contexts"]
|
|
||||||
filtered.append(entry)
|
|
||||||
return sorted(filtered, key=lambda x: x["occurrences"], reverse=True)
|
|
||||||
|
|
||||||
result = {
|
|
||||||
"entities": {},
|
|
||||||
"summary": {
|
|
||||||
"total_entities": 0,
|
|
||||||
"by_type": {}
|
|
||||||
},
|
|
||||||
"extraction_time": round(time.time() - start_time, 3)
|
|
||||||
}
|
|
||||||
|
|
||||||
for entity_type in extract_types:
|
|
||||||
if entity_type in entities:
|
|
||||||
filtered = filter_and_sort(entities[entity_type], min_occurrences)
|
|
||||||
result["entities"][entity_type] = filtered
|
|
||||||
result["summary"]["by_type"][entity_type] = len(filtered)
|
|
||||||
result["summary"]["total_entities"] += len(filtered)
|
|
||||||
|
|
||||||
return result
|
|
||||||
|
|
||||||
@mcp_tool(
|
|
||||||
name="get_chapter_summaries",
|
|
||||||
description="Get brief summaries/previews of each chapter in a Word document. Extracts the opening sentences of each chapter to give a quick overview of content."
|
|
||||||
)
|
|
||||||
@handle_office_errors("Chapter summaries")
|
|
||||||
async def get_chapter_summaries(
|
|
||||||
self,
|
|
||||||
file_path: str = Field(description="Path to Word document or URL"),
|
|
||||||
sentences_per_chapter: int = Field(default=3, description="Number of opening sentences to include per chapter"),
|
|
||||||
include_word_counts: bool = Field(default=True, description="Include word count for each chapter")
|
|
||||||
) -> dict[str, Any]:
|
|
||||||
"""Extract chapter summaries/previews from document."""
|
|
||||||
from docx import Document
|
|
||||||
import re
|
|
||||||
|
|
||||||
start_time = time.time()
|
|
||||||
local_path = await resolve_office_file_path(file_path)
|
|
||||||
|
|
||||||
validation = await validate_office_file(local_path)
|
|
||||||
if not validation["is_valid"]:
|
|
||||||
raise OfficeFileError(f"Invalid file: {', '.join(validation['errors'])}")
|
|
||||||
|
|
||||||
doc = Document(local_path)
|
|
||||||
|
|
||||||
chapters = []
|
|
||||||
current_chapter = None
|
|
||||||
chapter_text = []
|
|
||||||
chapter_word_count = 0
|
|
||||||
chapter_pattern = re.compile(r'^chapter\s*(\d+)', re.IGNORECASE)
|
|
||||||
|
|
||||||
def extract_preview(text_paragraphs, num_sentences):
|
|
||||||
"""Extract first N sentences from collected paragraphs."""
|
|
||||||
full_text = " ".join(text_paragraphs)
|
|
||||||
# Simple sentence splitting
|
|
||||||
sentences = re.split(r'(?<=[.!?])\s+', full_text)
|
|
||||||
preview_sentences = sentences[:num_sentences]
|
|
||||||
return " ".join(preview_sentences).strip()
|
|
||||||
|
|
||||||
def save_current_chapter():
|
|
||||||
"""Save the current chapter's data."""
|
|
||||||
nonlocal current_chapter, chapter_text, chapter_word_count
|
|
||||||
if current_chapter:
|
|
||||||
preview = extract_preview(chapter_text, sentences_per_chapter)
|
|
||||||
chapter_data = {
|
|
||||||
"chapter_number": current_chapter["number"],
|
|
||||||
"title": current_chapter["title"],
|
|
||||||
"paragraph_index": current_chapter["paragraph_index"],
|
|
||||||
"preview": preview if preview else "(No text content found)",
|
|
||||||
}
|
|
||||||
if include_word_counts:
|
|
||||||
chapter_data["word_count"] = chapter_word_count
|
|
||||||
chapters.append(chapter_data)
|
|
||||||
|
|
||||||
for para_idx, para in enumerate(doc.paragraphs):
|
|
||||||
text = para.text.strip()
|
|
||||||
style_name = para.style.name if para.style else ""
|
|
||||||
|
|
||||||
# Check if this is a chapter heading
|
|
||||||
chapter_match = chapter_pattern.match(text)
|
|
||||||
if chapter_match:
|
|
||||||
# Save previous chapter first
|
|
||||||
save_current_chapter()
|
|
||||||
|
|
||||||
# Start new chapter
|
|
||||||
current_chapter = {
|
|
||||||
"number": int(chapter_match.group(1)),
|
|
||||||
"title": text[:100],
|
|
||||||
"paragraph_index": para_idx
|
|
||||||
}
|
|
||||||
chapter_text = []
|
|
||||||
chapter_word_count = 0
|
|
||||||
elif current_chapter:
|
|
||||||
# Accumulate text for current chapter
|
|
||||||
if text:
|
|
||||||
word_count = len(text.split())
|
|
||||||
chapter_word_count += word_count
|
|
||||||
# Only collect first portion of text for preview
|
|
||||||
if len(" ".join(chapter_text)) < 1000:
|
|
||||||
chapter_text.append(text)
|
|
||||||
|
|
||||||
# Don't forget the last chapter
|
|
||||||
save_current_chapter()
|
|
||||||
|
|
||||||
# Calculate statistics
|
|
||||||
total_words = sum(c.get("word_count", 0) for c in chapters)
|
|
||||||
avg_words = total_words // len(chapters) if chapters else 0
|
|
||||||
|
|
||||||
return {
|
|
||||||
"chapters": chapters,
|
|
||||||
"summary": {
|
|
||||||
"total_chapters": len(chapters),
|
|
||||||
"total_words": total_words,
|
|
||||||
"average_words_per_chapter": avg_words,
|
|
||||||
"shortest_chapter": min((c for c in chapters), key=lambda x: x.get("word_count", 0), default=None),
|
|
||||||
"longest_chapter": max((c for c in chapters), key=lambda x: x.get("word_count", 0), default=None)
|
|
||||||
},
|
|
||||||
"extraction_time": round(time.time() - start_time, 3)
|
|
||||||
}
|
|
||||||
|
|
||||||
@mcp_tool(
|
|
||||||
name="save_reading_progress",
|
|
||||||
description="Save your reading progress in a Word document. Creates a bookmark file to track which chapter/paragraph you're on, so you can resume reading later."
|
|
||||||
)
|
|
||||||
@handle_office_errors("Save reading progress")
|
|
||||||
async def save_reading_progress(
|
|
||||||
self,
|
|
||||||
file_path: str = Field(description="Path to Word document"),
|
|
||||||
chapter_number: int = Field(default=1, description="Current chapter number"),
|
|
||||||
paragraph_index: int = Field(default=0, description="Current paragraph index"),
|
|
||||||
notes: str = Field(default="", description="Optional notes about where you left off")
|
|
||||||
) -> dict[str, Any]:
|
|
||||||
"""Save reading progress to a bookmark file."""
|
|
||||||
import json
|
|
||||||
from datetime import datetime
|
|
||||||
|
|
||||||
local_path = await resolve_office_file_path(file_path)
|
|
||||||
|
|
||||||
validation = await validate_office_file(local_path)
|
|
||||||
if not validation["is_valid"]:
|
|
||||||
raise OfficeFileError(f"Invalid file: {', '.join(validation['errors'])}")
|
|
||||||
|
|
||||||
# Create bookmark file path (same location as document)
|
|
||||||
doc_dir = os.path.dirname(local_path)
|
|
||||||
doc_name = os.path.splitext(os.path.basename(local_path))[0]
|
|
||||||
bookmark_path = os.path.join(doc_dir, f".{doc_name}.reading_progress.json")
|
|
||||||
|
|
||||||
# Load existing bookmarks or create new
|
|
||||||
bookmarks = {"history": []}
|
|
||||||
if os.path.exists(bookmark_path):
|
|
||||||
try:
|
|
||||||
with open(bookmark_path, 'r') as f:
|
|
||||||
bookmarks = json.load(f)
|
|
||||||
except (json.JSONDecodeError, IOError):
|
|
||||||
bookmarks = {"history": []}
|
|
||||||
|
|
||||||
# Create new bookmark entry
|
|
||||||
bookmark = {
|
|
||||||
"timestamp": datetime.now().isoformat(),
|
|
||||||
"chapter": chapter_number,
|
|
||||||
"paragraph_index": paragraph_index,
|
|
||||||
"notes": notes
|
|
||||||
}
|
|
||||||
|
|
||||||
# Update current position and add to history
|
|
||||||
bookmarks["current"] = bookmark
|
|
||||||
bookmarks["document"] = os.path.basename(local_path)
|
|
||||||
bookmarks["history"].append(bookmark)
|
|
||||||
|
|
||||||
# Keep only last 50 history entries
|
|
||||||
if len(bookmarks["history"]) > 50:
|
|
||||||
bookmarks["history"] = bookmarks["history"][-50:]
|
|
||||||
|
|
||||||
# Save bookmark file
|
|
||||||
with open(bookmark_path, 'w') as f:
|
|
||||||
json.dump(bookmarks, f, indent=2)
|
|
||||||
|
|
||||||
return {
|
|
||||||
"saved": True,
|
|
||||||
"bookmark_file": bookmark_path,
|
|
||||||
"position": {
|
|
||||||
"chapter": chapter_number,
|
|
||||||
"paragraph_index": paragraph_index
|
|
||||||
},
|
|
||||||
"notes": notes,
|
|
||||||
"timestamp": bookmark["timestamp"],
|
|
||||||
"history_entries": len(bookmarks["history"])
|
|
||||||
}
|
|
||||||
|
|
||||||
@mcp_tool(
|
|
||||||
name="get_reading_progress",
|
|
||||||
description="Retrieve your saved reading progress for a Word document. Shows where you left off and your reading history."
|
|
||||||
)
|
|
||||||
@handle_office_errors("Get reading progress")
|
|
||||||
async def get_reading_progress(
|
|
||||||
self,
|
|
||||||
file_path: str = Field(description="Path to Word document")
|
|
||||||
) -> dict[str, Any]:
|
|
||||||
"""Retrieve saved reading progress from bookmark file."""
|
|
||||||
import json
|
|
||||||
|
|
||||||
local_path = await resolve_office_file_path(file_path)
|
|
||||||
|
|
||||||
validation = await validate_office_file(local_path)
|
|
||||||
if not validation["is_valid"]:
|
|
||||||
raise OfficeFileError(f"Invalid file: {', '.join(validation['errors'])}")
|
|
||||||
|
|
||||||
# Find bookmark file
|
|
||||||
doc_dir = os.path.dirname(local_path)
|
|
||||||
doc_name = os.path.splitext(os.path.basename(local_path))[0]
|
|
||||||
bookmark_path = os.path.join(doc_dir, f".{doc_name}.reading_progress.json")
|
|
||||||
|
|
||||||
if not os.path.exists(bookmark_path):
|
|
||||||
return {
|
|
||||||
"has_progress": False,
|
|
||||||
"message": "No reading progress saved for this document. Use save_reading_progress to save your position."
|
|
||||||
}
|
|
||||||
|
|
||||||
# Load bookmarks
|
|
||||||
try:
|
|
||||||
with open(bookmark_path, 'r') as f:
|
|
||||||
bookmarks = json.load(f)
|
|
||||||
except (json.JSONDecodeError, IOError) as e:
|
|
||||||
return {
|
|
||||||
"has_progress": False,
|
|
||||||
"error": f"Could not read bookmark file: {str(e)}"
|
|
||||||
}
|
|
||||||
|
|
||||||
current = bookmarks.get("current", {})
|
|
||||||
history = bookmarks.get("history", [])
|
|
||||||
|
|
||||||
return {
|
|
||||||
"has_progress": True,
|
|
||||||
"document": bookmarks.get("document", os.path.basename(local_path)),
|
|
||||||
"current_position": {
|
|
||||||
"chapter": current.get("chapter"),
|
|
||||||
"paragraph_index": current.get("paragraph_index"),
|
|
||||||
"notes": current.get("notes", ""),
|
|
||||||
"last_read": current.get("timestamp")
|
|
||||||
},
|
|
||||||
"reading_sessions": len(history),
|
|
||||||
"recent_history": history[-5:] if history else [],
|
|
||||||
"bookmark_file": bookmark_path
|
|
||||||
}
|
|
||||||
@ -14,7 +14,6 @@ import os
|
|||||||
import tempfile
|
import tempfile
|
||||||
|
|
||||||
from fastmcp import FastMCP
|
from fastmcp import FastMCP
|
||||||
from fastmcp.prompts import Prompt
|
|
||||||
|
|
||||||
from .mixins import UniversalMixin, WordMixin, ExcelMixin, PowerPointMixin
|
from .mixins import UniversalMixin, WordMixin, ExcelMixin, PowerPointMixin
|
||||||
|
|
||||||
@ -40,252 +39,14 @@ powerpoint_mixin.register_all(app, prefix="")
|
|||||||
# Note: All helper functions are still available from server_legacy.py for import by mixins
|
# Note: All helper functions are still available from server_legacy.py for import by mixins
|
||||||
# This allows gradual migration while maintaining backward compatibility
|
# This allows gradual migration while maintaining backward compatibility
|
||||||
|
|
||||||
|
|
||||||
# ==================== MCP Prompts ====================
|
|
||||||
# Prompts help users understand how to use tools effectively
|
|
||||||
# Organized from basic to advanced multi-step workflows
|
|
||||||
|
|
||||||
@app.prompt(
|
|
||||||
name="explore-document",
|
|
||||||
description="Basic: Start exploring a new document - get structure, identify key content"
|
|
||||||
)
|
|
||||||
def prompt_explore_document(file_path: str = "") -> list:
|
|
||||||
"""Guide for exploring a new Word document."""
|
|
||||||
path_hint = f"the document at `{file_path}`" if file_path else "your document"
|
|
||||||
return [
|
|
||||||
{
|
|
||||||
"role": "user",
|
|
||||||
"content": f"""I want to explore {path_hint}. Please help me understand it by:
|
|
||||||
|
|
||||||
1. First, use `get_document_outline` to show me the document structure (chapters, sections, headings)
|
|
||||||
2. Then use `check_style_consistency` to identify any formatting issues or problems
|
|
||||||
3. Finally, give me a summary of what the document contains based on the outline
|
|
||||||
|
|
||||||
This will help me understand what I'm working with before diving into the content."""
|
|
||||||
}
|
|
||||||
]
|
|
||||||
|
|
||||||
|
|
||||||
@app.prompt(
|
|
||||||
name="find-character",
|
|
||||||
description="Basic: Find all mentions of a person/character in a document"
|
|
||||||
)
|
|
||||||
def prompt_find_character(file_path: str = "", character_name: str = "") -> list:
|
|
||||||
"""Guide for finding character mentions."""
|
|
||||||
path_hint = f"in `{file_path}`" if file_path else "in my document"
|
|
||||||
name_hint = f'"{character_name}"' if character_name else "a character"
|
|
||||||
return [
|
|
||||||
{
|
|
||||||
"role": "user",
|
|
||||||
"content": f"""Help me find all mentions of {name_hint} {path_hint}.
|
|
||||||
|
|
||||||
Use `search_document` to find occurrences with context. I want to see:
|
|
||||||
- Each mention with surrounding text
|
|
||||||
- Which chapter each mention appears in
|
|
||||||
- A count of total appearances
|
|
||||||
|
|
||||||
This will help me track the character's journey through the narrative."""
|
|
||||||
}
|
|
||||||
]
|
|
||||||
|
|
||||||
|
|
||||||
@app.prompt(
|
|
||||||
name="chapter-preview",
|
|
||||||
description="Basic: Get a quick preview of each chapter without reading the full content"
|
|
||||||
)
|
|
||||||
def prompt_chapter_preview(file_path: str = "") -> list:
|
|
||||||
"""Guide for getting chapter previews."""
|
|
||||||
path_hint = f"from `{file_path}`" if file_path else ""
|
|
||||||
return [
|
|
||||||
{
|
|
||||||
"role": "user",
|
|
||||||
"content": f"""I want a quick preview of each chapter {path_hint}.
|
|
||||||
|
|
||||||
Use `get_chapter_summaries` with 3-4 sentences per chapter to give me a preview of what each chapter covers. Include word counts so I know which chapters are longest.
|
|
||||||
|
|
||||||
This gives me a roadmap before I start reading in depth."""
|
|
||||||
}
|
|
||||||
]
|
|
||||||
|
|
||||||
|
|
||||||
@app.prompt(
|
|
||||||
name="resume-reading",
|
|
||||||
description="Intermediate: Check where you left off and continue reading"
|
|
||||||
)
|
|
||||||
def prompt_resume_reading(file_path: str = "") -> list:
|
|
||||||
"""Guide for resuming reading."""
|
|
||||||
path_hint = f"in `{file_path}`" if file_path else ""
|
|
||||||
return [
|
|
||||||
{
|
|
||||||
"role": "user",
|
|
||||||
"content": f"""I want to continue reading where I left off {path_hint}.
|
|
||||||
|
|
||||||
1. First, use `get_reading_progress` to see where I was
|
|
||||||
2. Then use `convert_to_markdown` with `chapter_name` set to that chapter to show me the content
|
|
||||||
3. When I tell you where to stop, use `save_reading_progress` to bookmark my position
|
|
||||||
|
|
||||||
This is my reading workflow for long documents."""
|
|
||||||
}
|
|
||||||
]
|
|
||||||
|
|
||||||
|
|
||||||
@app.prompt(
|
|
||||||
name="document-analysis",
|
|
||||||
description="Intermediate: Comprehensive analysis - structure, entities, and key information"
|
|
||||||
)
|
|
||||||
def prompt_document_analysis(file_path: str = "") -> list:
|
|
||||||
"""Guide for comprehensive document analysis."""
|
|
||||||
path_hint = f"the document `{file_path}`" if file_path else "my document"
|
|
||||||
return [
|
|
||||||
{
|
|
||||||
"role": "user",
|
|
||||||
"content": f"""Perform a comprehensive analysis of {path_hint}:
|
|
||||||
|
|
||||||
1. **Structure Analysis** (`get_document_outline`): Map out all chapters, sections, and headings
|
|
||||||
2. **Quality Check** (`check_style_consistency`): Identify any formatting issues
|
|
||||||
3. **Entity Extraction** (`extract_entities`): Find all people, places, and organizations mentioned
|
|
||||||
4. **Chapter Overview** (`get_chapter_summaries`): Generate previews of each chapter
|
|
||||||
|
|
||||||
Summarize the findings in a report format. This gives me a complete picture of the document."""
|
|
||||||
}
|
|
||||||
]
|
|
||||||
|
|
||||||
|
|
||||||
@app.prompt(
|
|
||||||
name="character-journey",
|
|
||||||
description="Advanced: Track a character's complete journey through a document"
|
|
||||||
)
|
|
||||||
def prompt_character_journey(file_path: str = "", character_name: str = "") -> list:
|
|
||||||
"""Guide for tracking a character's journey."""
|
|
||||||
path_hint = f"in `{file_path}`" if file_path else ""
|
|
||||||
name_hint = f'"{character_name}"' if character_name else "the main character"
|
|
||||||
return [
|
|
||||||
{
|
|
||||||
"role": "user",
|
|
||||||
"content": f"""Help me track {name_hint}'s complete journey {path_hint}:
|
|
||||||
|
|
||||||
**Step 1 - Get Context**
|
|
||||||
Use `get_document_outline` to understand the chapter structure
|
|
||||||
|
|
||||||
**Step 2 - Find All Mentions**
|
|
||||||
Use `search_document` to find every mention of the character with context
|
|
||||||
|
|
||||||
**Step 3 - Analyze by Chapter**
|
|
||||||
For each chapter where the character appears, use `convert_to_markdown` with `chapter_name` to extract the relevant sections
|
|
||||||
|
|
||||||
**Step 4 - Summarize the Journey**
|
|
||||||
Create a timeline or narrative summary of the character's arc through the story
|
|
||||||
|
|
||||||
This multi-step workflow helps me understand a character's complete narrative arc."""
|
|
||||||
}
|
|
||||||
]
|
|
||||||
|
|
||||||
|
|
||||||
@app.prompt(
|
|
||||||
name="document-comparison",
|
|
||||||
description="Advanced: Compare entities and themes between chapters or sections"
|
|
||||||
)
|
|
||||||
def prompt_document_comparison(file_path: str = "") -> list:
|
|
||||||
"""Guide for comparing document sections."""
|
|
||||||
path_hint = f"from `{file_path}`" if file_path else ""
|
|
||||||
return [
|
|
||||||
{
|
|
||||||
"role": "user",
|
|
||||||
"content": f"""Help me compare different sections of the document {path_hint}:
|
|
||||||
|
|
||||||
**Step 1 - Get Structure**
|
|
||||||
Use `get_document_outline` to identify all chapters/sections
|
|
||||||
|
|
||||||
**Step 2 - Extract Entities by Section**
|
|
||||||
Use `extract_entities` with different chapters to see which characters/places appear where
|
|
||||||
|
|
||||||
**Step 3 - Get Chapter Summaries**
|
|
||||||
Use `get_chapter_summaries` to understand the focus of each section
|
|
||||||
|
|
||||||
**Step 4 - Compare and Contrast**
|
|
||||||
Based on the data, identify:
|
|
||||||
- Which characters appear in which chapters
|
|
||||||
- How locations shift through the narrative
|
|
||||||
- Patterns in entity distribution
|
|
||||||
|
|
||||||
Create a comparison matrix or analysis."""
|
|
||||||
}
|
|
||||||
]
|
|
||||||
|
|
||||||
|
|
||||||
@app.prompt(
|
|
||||||
name="full-reading-session",
|
|
||||||
description="Advanced: Complete guided reading session with bookmarking"
|
|
||||||
)
|
|
||||||
def prompt_full_reading_session(file_path: str = "", start_chapter: int = 1) -> list:
|
|
||||||
"""Guide for a complete reading session."""
|
|
||||||
path_hint = f"of `{file_path}`" if file_path else ""
|
|
||||||
return [
|
|
||||||
{
|
|
||||||
"role": "user",
|
|
||||||
"content": f"""Let's do a guided reading session {path_hint}:
|
|
||||||
|
|
||||||
**Setup Phase**
|
|
||||||
1. Use `get_reading_progress` to check if I have a saved position
|
|
||||||
2. Use `get_document_outline` to show the chapter list
|
|
||||||
3. Use `check_style_consistency` to flag any document issues
|
|
||||||
|
|
||||||
**Reading Phase**
|
|
||||||
4. Use `convert_to_markdown` with `chapter_name="Chapter {start_chapter}"` to show that chapter
|
|
||||||
5. When I'm done, I'll say "stop at paragraph X" and you use `save_reading_progress`
|
|
||||||
|
|
||||||
**Analysis Phase (Optional)**
|
|
||||||
6. Use `extract_entities` with `entity_types="people"` to show who appears in what I've read
|
|
||||||
7. Use `search_document` if I want to find specific references
|
|
||||||
|
|
||||||
This creates an interactive, bookmark-enabled reading experience."""
|
|
||||||
}
|
|
||||||
]
|
|
||||||
|
|
||||||
|
|
||||||
@app.prompt(
|
|
||||||
name="manuscript-review",
|
|
||||||
description="Advanced: Complete manuscript review workflow for editors"
|
|
||||||
)
|
|
||||||
def prompt_manuscript_review(file_path: str = "") -> list:
|
|
||||||
"""Guide for comprehensive manuscript review."""
|
|
||||||
path_hint = f"manuscript at `{file_path}`" if file_path else "the manuscript"
|
|
||||||
return [
|
|
||||||
{
|
|
||||||
"role": "user",
|
|
||||||
"content": f"""Help me conduct a complete editorial review of {path_hint}:
|
|
||||||
|
|
||||||
**Phase 1: Structure Assessment**
|
|
||||||
1. `get_document_outline` - Map the complete structure
|
|
||||||
2. `check_style_consistency` - Identify formatting issues, missing chapters, style problems
|
|
||||||
3. Report any structural issues found
|
|
||||||
|
|
||||||
**Phase 2: Content Analysis**
|
|
||||||
4. `get_chapter_summaries` - Get overview of each chapter's content
|
|
||||||
5. `extract_entities` - Extract all characters, locations, organizations
|
|
||||||
6. Flag any inconsistencies (characters who appear then disappear, etc.)
|
|
||||||
|
|
||||||
**Phase 3: Deep Dive**
|
|
||||||
7. For each chapter with issues, use `convert_to_markdown` to review
|
|
||||||
8. Use `search_document` to verify specific details if needed
|
|
||||||
9. Document findings with chapter numbers and paragraph indices
|
|
||||||
|
|
||||||
**Phase 4: Final Report**
|
|
||||||
Compile all findings into an editorial report with:
|
|
||||||
- Structure issues and recommendations
|
|
||||||
- Character/entity tracking
|
|
||||||
- Suggested fixes with specific locations
|
|
||||||
|
|
||||||
This is a complete editorial workflow for manuscript review."""
|
|
||||||
}
|
|
||||||
]
|
|
||||||
|
|
||||||
|
|
||||||
def main():
|
def main():
|
||||||
"""Entry point for the MCP Office Tools server."""
|
"""Entry point for the MCP Office Tools server."""
|
||||||
# CRITICAL: show_banner=False is required for stdio transport!
|
import asyncio
|
||||||
# FastMCP's banner prints ASCII art to stdout which breaks JSON-RPC protocol
|
|
||||||
app.run(show_banner=False)
|
async def run_server():
|
||||||
|
await app.run_stdio_async()
|
||||||
|
|
||||||
|
asyncio.run(run_server())
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
main()
|
main()
|
||||||
2209
src/mcp_office_tools/server_legacy.py
Normal file
2209
src/mcp_office_tools/server_legacy.py
Normal file
File diff suppressed because it is too large
Load Diff
2209
src/mcp_office_tools/server_monolithic.py
Normal file
2209
src/mcp_office_tools/server_monolithic.py
Normal file
File diff suppressed because it is too large
Load Diff
@ -27,48 +27,6 @@ from .decorators import (
|
|||||||
handle_office_errors
|
handle_office_errors
|
||||||
)
|
)
|
||||||
|
|
||||||
from .processing import (
|
|
||||||
TEMP_DIR,
|
|
||||||
DEBUG,
|
|
||||||
_extract_basic_metadata,
|
|
||||||
_calculate_health_score,
|
|
||||||
_get_health_recommendations,
|
|
||||||
_smart_truncate_content,
|
|
||||||
_parse_page_range,
|
|
||||||
_get_processing_recommendation,
|
|
||||||
)
|
|
||||||
|
|
||||||
from .word_processing import (
|
|
||||||
_extract_word_text,
|
|
||||||
_extract_word_images,
|
|
||||||
_extract_word_metadata,
|
|
||||||
_convert_docx_to_markdown,
|
|
||||||
_convert_docx_with_python_docx,
|
|
||||||
_convert_doc_to_markdown,
|
|
||||||
_get_ultra_fast_summary,
|
|
||||||
_find_bookmark_content_range,
|
|
||||||
_find_chapter_content_range,
|
|
||||||
_get_available_headings,
|
|
||||||
_has_page_break,
|
|
||||||
_analyze_document_size,
|
|
||||||
_paragraph_to_markdown,
|
|
||||||
_table_to_markdown,
|
|
||||||
_html_to_markdown,
|
|
||||||
_extract_markdown_structure,
|
|
||||||
)
|
|
||||||
|
|
||||||
from .excel_processing import (
|
|
||||||
_extract_excel_text,
|
|
||||||
_extract_excel_images,
|
|
||||||
_extract_excel_metadata,
|
|
||||||
)
|
|
||||||
|
|
||||||
from .powerpoint_processing import (
|
|
||||||
_extract_powerpoint_text,
|
|
||||||
_extract_powerpoint_images,
|
|
||||||
_extract_powerpoint_metadata,
|
|
||||||
)
|
|
||||||
|
|
||||||
__all__ = [
|
__all__ = [
|
||||||
# Validation
|
# Validation
|
||||||
"OfficeFileError",
|
"OfficeFileError",
|
||||||
|
|||||||
@ -1,203 +0,0 @@
|
|||||||
"""Excel document processing utilities.
|
|
||||||
|
|
||||||
This module provides helper functions for extracting text, images, and metadata
|
|
||||||
from Excel documents (.xlsx, .xls, .xlsm, .csv) with intelligent method selection
|
|
||||||
and fallback support.
|
|
||||||
"""
|
|
||||||
|
|
||||||
from typing import Any
|
|
||||||
|
|
||||||
from . import OfficeFileError
|
|
||||||
|
|
||||||
|
|
||||||
async def _extract_excel_text(file_path: str, extension: str, preserve_formatting: bool, method: str) -> dict[str, Any]:
|
|
||||||
"""Extract text from Excel documents."""
|
|
||||||
methods_tried = []
|
|
||||||
|
|
||||||
if extension == ".csv":
|
|
||||||
# CSV handling
|
|
||||||
import pandas as pd
|
|
||||||
try:
|
|
||||||
df = pd.read_csv(file_path)
|
|
||||||
text = df.to_string()
|
|
||||||
return {
|
|
||||||
"text": text,
|
|
||||||
"method_used": "pandas",
|
|
||||||
"methods_tried": ["pandas"],
|
|
||||||
"formatted_sections": [{"type": "table", "data": df.to_dict()}] if preserve_formatting else []
|
|
||||||
}
|
|
||||||
except Exception as e:
|
|
||||||
raise OfficeFileError(f"CSV processing failed: {str(e)}")
|
|
||||||
|
|
||||||
# Excel file handling
|
|
||||||
text = ""
|
|
||||||
formatted_sections = []
|
|
||||||
method_used = None
|
|
||||||
|
|
||||||
method_order = ["openpyxl", "pandas", "xlrd"] if extension == ".xlsx" else ["xlrd", "pandas", "openpyxl"]
|
|
||||||
|
|
||||||
for method_name in method_order:
|
|
||||||
try:
|
|
||||||
methods_tried.append(method_name)
|
|
||||||
|
|
||||||
if method_name == "openpyxl" and extension in [".xlsx", ".xlsm"]:
|
|
||||||
import openpyxl
|
|
||||||
wb = openpyxl.load_workbook(file_path, data_only=True)
|
|
||||||
|
|
||||||
text_parts = []
|
|
||||||
for sheet_name in wb.sheetnames:
|
|
||||||
ws = wb[sheet_name]
|
|
||||||
text_parts.append(f"Sheet: {sheet_name}")
|
|
||||||
|
|
||||||
for row in ws.iter_rows(values_only=True):
|
|
||||||
row_text = "\t".join(str(cell) if cell is not None else "" for cell in row)
|
|
||||||
if row_text.strip():
|
|
||||||
text_parts.append(row_text)
|
|
||||||
|
|
||||||
if preserve_formatting:
|
|
||||||
formatted_sections.append({
|
|
||||||
"type": "worksheet",
|
|
||||||
"name": sheet_name,
|
|
||||||
"data": [[str(cell.value) if cell.value is not None else "" for cell in row] for row in ws.iter_rows()]
|
|
||||||
})
|
|
||||||
|
|
||||||
text = "\n".join(text_parts)
|
|
||||||
method_used = "openpyxl"
|
|
||||||
break
|
|
||||||
|
|
||||||
elif method_name == "pandas":
|
|
||||||
import pandas as pd
|
|
||||||
|
|
||||||
if extension in [".xlsx", ".xlsm"]:
|
|
||||||
dfs = pd.read_excel(file_path, sheet_name=None)
|
|
||||||
else: # .xls
|
|
||||||
dfs = pd.read_excel(file_path, sheet_name=None, engine='xlrd')
|
|
||||||
|
|
||||||
text_parts = []
|
|
||||||
for sheet_name, df in dfs.items():
|
|
||||||
text_parts.append(f"Sheet: {sheet_name}")
|
|
||||||
text_parts.append(df.to_string())
|
|
||||||
|
|
||||||
if preserve_formatting:
|
|
||||||
formatted_sections.append({
|
|
||||||
"type": "dataframe",
|
|
||||||
"name": sheet_name,
|
|
||||||
"data": df.to_dict()
|
|
||||||
})
|
|
||||||
|
|
||||||
text = "\n\n".join(text_parts)
|
|
||||||
method_used = "pandas"
|
|
||||||
break
|
|
||||||
|
|
||||||
elif method_name == "xlrd" and extension == ".xls":
|
|
||||||
import xlrd
|
|
||||||
wb = xlrd.open_workbook(file_path)
|
|
||||||
|
|
||||||
text_parts = []
|
|
||||||
for sheet in wb.sheets():
|
|
||||||
text_parts.append(f"Sheet: {sheet.name}")
|
|
||||||
|
|
||||||
for row_idx in range(sheet.nrows):
|
|
||||||
row = sheet.row_values(row_idx)
|
|
||||||
row_text = "\t".join(str(cell) for cell in row)
|
|
||||||
text_parts.append(row_text)
|
|
||||||
|
|
||||||
text = "\n".join(text_parts)
|
|
||||||
method_used = "xlrd"
|
|
||||||
break
|
|
||||||
|
|
||||||
except ImportError:
|
|
||||||
continue
|
|
||||||
except Exception:
|
|
||||||
continue
|
|
||||||
|
|
||||||
if not method_used:
|
|
||||||
raise OfficeFileError(f"Failed to extract text using methods: {', '.join(methods_tried)}")
|
|
||||||
|
|
||||||
return {
|
|
||||||
"text": text,
|
|
||||||
"method_used": method_used,
|
|
||||||
"methods_tried": methods_tried,
|
|
||||||
"formatted_sections": formatted_sections
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
async def _extract_excel_images(file_path: str, extension: str, output_format: str, min_width: int, min_height: int) -> list[dict[str, Any]]:
|
|
||||||
"""Extract images from Excel documents."""
|
|
||||||
import io
|
|
||||||
import os
|
|
||||||
import tempfile
|
|
||||||
import zipfile
|
|
||||||
|
|
||||||
from PIL import Image
|
|
||||||
|
|
||||||
images = []
|
|
||||||
TEMP_DIR = os.environ.get("OFFICE_TEMP_DIR", tempfile.gettempdir())
|
|
||||||
|
|
||||||
if extension in [".xlsx", ".xlsm"]:
|
|
||||||
try:
|
|
||||||
with zipfile.ZipFile(file_path, 'r') as zip_file:
|
|
||||||
# Look for images in media folder
|
|
||||||
image_files = [f for f in zip_file.namelist() if f.startswith('xl/media/')]
|
|
||||||
|
|
||||||
for i, img_path in enumerate(image_files):
|
|
||||||
try:
|
|
||||||
img_data = zip_file.read(img_path)
|
|
||||||
img = Image.open(io.BytesIO(img_data))
|
|
||||||
|
|
||||||
# Size filtering
|
|
||||||
if img.width >= min_width and img.height >= min_height:
|
|
||||||
# Save to temp file
|
|
||||||
temp_path = os.path.join(TEMP_DIR, f"excel_image_{i}.{output_format}")
|
|
||||||
img.save(temp_path, format=output_format.upper())
|
|
||||||
|
|
||||||
images.append({
|
|
||||||
"index": i,
|
|
||||||
"filename": os.path.basename(img_path),
|
|
||||||
"path": temp_path,
|
|
||||||
"width": img.width,
|
|
||||||
"height": img.height,
|
|
||||||
"format": img.format,
|
|
||||||
"size_bytes": len(img_data)
|
|
||||||
})
|
|
||||||
except Exception:
|
|
||||||
continue
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
raise OfficeFileError(f"Excel image extraction failed: {str(e)}")
|
|
||||||
|
|
||||||
return images
|
|
||||||
|
|
||||||
|
|
||||||
async def _extract_excel_metadata(file_path: str, extension: str) -> dict[str, Any]:
|
|
||||||
"""Extract Excel-specific metadata."""
|
|
||||||
metadata = {"type": "excel", "extension": extension}
|
|
||||||
|
|
||||||
if extension in [".xlsx", ".xlsm"]:
|
|
||||||
try:
|
|
||||||
import openpyxl
|
|
||||||
wb = openpyxl.load_workbook(file_path)
|
|
||||||
|
|
||||||
props = wb.properties
|
|
||||||
metadata.update({
|
|
||||||
"title": props.title,
|
|
||||||
"creator": props.creator,
|
|
||||||
"subject": props.subject,
|
|
||||||
"description": props.description,
|
|
||||||
"keywords": props.keywords,
|
|
||||||
"created": str(props.created) if props.created else None,
|
|
||||||
"modified": str(props.modified) if props.modified else None
|
|
||||||
})
|
|
||||||
|
|
||||||
# Workbook structure
|
|
||||||
metadata.update({
|
|
||||||
"worksheet_count": len(wb.worksheets),
|
|
||||||
"worksheet_names": wb.sheetnames,
|
|
||||||
"has_charts": any(len(ws._charts) > 0 for ws in wb.worksheets),
|
|
||||||
"has_images": any(len(ws._images) > 0 for ws in wb.worksheets)
|
|
||||||
})
|
|
||||||
|
|
||||||
except Exception:
|
|
||||||
pass
|
|
||||||
|
|
||||||
return metadata
|
|
||||||
@ -1,177 +0,0 @@
|
|||||||
"""PowerPoint document processing utilities.
|
|
||||||
|
|
||||||
This module provides helper functions for extracting text, images, and metadata
|
|
||||||
from PowerPoint documents (.pptx and .ppt files).
|
|
||||||
"""
|
|
||||||
|
|
||||||
import io
|
|
||||||
import os
|
|
||||||
import zipfile
|
|
||||||
from typing import Any
|
|
||||||
|
|
||||||
from PIL import Image
|
|
||||||
|
|
||||||
from . import OfficeFileError
|
|
||||||
|
|
||||||
|
|
||||||
async def _extract_powerpoint_text(
|
|
||||||
file_path: str, extension: str, preserve_formatting: bool, method: str
|
|
||||||
) -> dict[str, Any]:
|
|
||||||
"""Extract text from PowerPoint documents."""
|
|
||||||
methods_tried = []
|
|
||||||
|
|
||||||
if extension == ".pptx":
|
|
||||||
try:
|
|
||||||
import pptx
|
|
||||||
|
|
||||||
prs = pptx.Presentation(file_path)
|
|
||||||
|
|
||||||
text_parts = []
|
|
||||||
formatted_sections = []
|
|
||||||
|
|
||||||
for slide_num, slide in enumerate(prs.slides, 1):
|
|
||||||
slide_text_parts = []
|
|
||||||
|
|
||||||
for shape in slide.shapes:
|
|
||||||
if hasattr(shape, "text") and shape.text:
|
|
||||||
slide_text_parts.append(shape.text)
|
|
||||||
|
|
||||||
slide_text = "\n".join(slide_text_parts)
|
|
||||||
text_parts.append(f"Slide {slide_num}:\n{slide_text}")
|
|
||||||
|
|
||||||
if preserve_formatting:
|
|
||||||
formatted_sections.append(
|
|
||||||
{
|
|
||||||
"type": "slide",
|
|
||||||
"number": slide_num,
|
|
||||||
"text": slide_text,
|
|
||||||
"shapes": len(slide.shapes),
|
|
||||||
}
|
|
||||||
)
|
|
||||||
|
|
||||||
text = "\n\n".join(text_parts)
|
|
||||||
|
|
||||||
return {
|
|
||||||
"text": text,
|
|
||||||
"method_used": "python-pptx",
|
|
||||||
"methods_tried": ["python-pptx"],
|
|
||||||
"formatted_sections": formatted_sections,
|
|
||||||
}
|
|
||||||
|
|
||||||
except ImportError:
|
|
||||||
methods_tried.append("python-pptx")
|
|
||||||
except Exception:
|
|
||||||
methods_tried.append("python-pptx")
|
|
||||||
|
|
||||||
# Legacy .ppt handling would require additional libraries
|
|
||||||
if extension == ".ppt":
|
|
||||||
raise OfficeFileError(
|
|
||||||
"Legacy PowerPoint (.ppt) text extraction requires additional setup"
|
|
||||||
)
|
|
||||||
|
|
||||||
raise OfficeFileError(
|
|
||||||
f"Failed to extract text using methods: {', '.join(methods_tried)}"
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
async def _extract_powerpoint_images(
|
|
||||||
file_path: str,
|
|
||||||
extension: str,
|
|
||||||
output_format: str,
|
|
||||||
min_width: int,
|
|
||||||
min_height: int,
|
|
||||||
temp_dir: str,
|
|
||||||
) -> list[dict[str, Any]]:
|
|
||||||
"""Extract images from PowerPoint documents."""
|
|
||||||
images = []
|
|
||||||
|
|
||||||
if extension == ".pptx":
|
|
||||||
try:
|
|
||||||
with zipfile.ZipFile(file_path, "r") as zip_file:
|
|
||||||
# Look for images in media folder
|
|
||||||
image_files = [
|
|
||||||
f for f in zip_file.namelist() if f.startswith("ppt/media/")
|
|
||||||
]
|
|
||||||
|
|
||||||
for i, img_path in enumerate(image_files):
|
|
||||||
try:
|
|
||||||
img_data = zip_file.read(img_path)
|
|
||||||
img = Image.open(io.BytesIO(img_data))
|
|
||||||
|
|
||||||
# Size filtering
|
|
||||||
if img.width >= min_width and img.height >= min_height:
|
|
||||||
# Save to temp file
|
|
||||||
temp_path = os.path.join(
|
|
||||||
temp_dir, f"powerpoint_image_{i}.{output_format}"
|
|
||||||
)
|
|
||||||
img.save(temp_path, format=output_format.upper())
|
|
||||||
|
|
||||||
images.append(
|
|
||||||
{
|
|
||||||
"index": i,
|
|
||||||
"filename": os.path.basename(img_path),
|
|
||||||
"path": temp_path,
|
|
||||||
"width": img.width,
|
|
||||||
"height": img.height,
|
|
||||||
"format": img.format,
|
|
||||||
"size_bytes": len(img_data),
|
|
||||||
}
|
|
||||||
)
|
|
||||||
except Exception:
|
|
||||||
continue
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
raise OfficeFileError(f"PowerPoint image extraction failed: {str(e)}")
|
|
||||||
|
|
||||||
return images
|
|
||||||
|
|
||||||
|
|
||||||
async def _extract_powerpoint_metadata(
|
|
||||||
file_path: str, extension: str
|
|
||||||
) -> dict[str, Any]:
|
|
||||||
"""Extract PowerPoint-specific metadata."""
|
|
||||||
metadata = {"type": "powerpoint", "extension": extension}
|
|
||||||
|
|
||||||
if extension == ".pptx":
|
|
||||||
try:
|
|
||||||
import pptx
|
|
||||||
|
|
||||||
prs = pptx.Presentation(file_path)
|
|
||||||
|
|
||||||
core_props = prs.core_properties
|
|
||||||
metadata.update(
|
|
||||||
{
|
|
||||||
"title": core_props.title,
|
|
||||||
"author": core_props.author,
|
|
||||||
"subject": core_props.subject,
|
|
||||||
"keywords": core_props.keywords,
|
|
||||||
"comments": core_props.comments,
|
|
||||||
"created": str(core_props.created) if core_props.created else None,
|
|
||||||
"modified": str(core_props.modified)
|
|
||||||
if core_props.modified
|
|
||||||
else None,
|
|
||||||
}
|
|
||||||
)
|
|
||||||
|
|
||||||
# Presentation structure
|
|
||||||
slide_layouts = set()
|
|
||||||
total_shapes = 0
|
|
||||||
|
|
||||||
for slide in prs.slides:
|
|
||||||
slide_layouts.add(slide.slide_layout.name)
|
|
||||||
total_shapes += len(slide.shapes)
|
|
||||||
|
|
||||||
metadata.update(
|
|
||||||
{
|
|
||||||
"slide_count": len(prs.slides),
|
|
||||||
"slide_layouts": list(slide_layouts),
|
|
||||||
"total_shapes": total_shapes,
|
|
||||||
"slide_width": prs.slide_width,
|
|
||||||
"slide_height": prs.slide_height,
|
|
||||||
}
|
|
||||||
)
|
|
||||||
|
|
||||||
except Exception:
|
|
||||||
pass
|
|
||||||
|
|
||||||
return metadata
|
|
||||||
@ -1,228 +0,0 @@
|
|||||||
"""Universal processing helper functions for Office documents.
|
|
||||||
|
|
||||||
This module contains helper functions used across different document processing
|
|
||||||
operations including metadata extraction, health scoring, content truncation,
|
|
||||||
and page range parsing.
|
|
||||||
"""
|
|
||||||
|
|
||||||
import os
|
|
||||||
import tempfile
|
|
||||||
from typing import Any
|
|
||||||
|
|
||||||
# Configuration
|
|
||||||
TEMP_DIR = os.environ.get("OFFICE_TEMP_DIR", tempfile.gettempdir())
|
|
||||||
DEBUG = os.environ.get("DEBUG", "false").lower() == "true"
|
|
||||||
|
|
||||||
|
|
||||||
async def _extract_basic_metadata(file_path: str, extension: str, category: str) -> dict[str, Any]:
|
|
||||||
"""Extract basic metadata from Office documents."""
|
|
||||||
metadata = {"category": category, "extension": extension}
|
|
||||||
|
|
||||||
try:
|
|
||||||
if extension in [".docx", ".xlsx", ".pptx"] and category in ["word", "excel", "powerpoint"]:
|
|
||||||
import zipfile
|
|
||||||
|
|
||||||
with zipfile.ZipFile(file_path, 'r') as zip_file:
|
|
||||||
# Core properties
|
|
||||||
if 'docProps/core.xml' in zip_file.namelist():
|
|
||||||
zip_file.read('docProps/core.xml').decode('utf-8')
|
|
||||||
metadata["has_core_properties"] = True
|
|
||||||
|
|
||||||
# App properties
|
|
||||||
if 'docProps/app.xml' in zip_file.namelist():
|
|
||||||
zip_file.read('docProps/app.xml').decode('utf-8')
|
|
||||||
metadata["has_app_properties"] = True
|
|
||||||
|
|
||||||
except Exception:
|
|
||||||
pass
|
|
||||||
|
|
||||||
return metadata
|
|
||||||
|
|
||||||
|
|
||||||
def _calculate_health_score(validation: dict[str, Any], format_info: dict[str, Any]) -> int:
|
|
||||||
"""Calculate document health score (1-10)."""
|
|
||||||
score = 10
|
|
||||||
|
|
||||||
# Deduct for validation errors
|
|
||||||
if not validation["is_valid"]:
|
|
||||||
score -= 5
|
|
||||||
|
|
||||||
if validation["errors"]:
|
|
||||||
score -= len(validation["errors"]) * 2
|
|
||||||
|
|
||||||
if validation["warnings"]:
|
|
||||||
score -= len(validation["warnings"])
|
|
||||||
|
|
||||||
# Deduct for problematic characteristics
|
|
||||||
if validation.get("password_protected"):
|
|
||||||
score -= 1
|
|
||||||
|
|
||||||
if format_info.get("is_legacy"):
|
|
||||||
score -= 1
|
|
||||||
|
|
||||||
structure = format_info.get("structure", {})
|
|
||||||
if structure.get("estimated_complexity") == "complex":
|
|
||||||
score -= 1
|
|
||||||
|
|
||||||
return max(1, min(10, score))
|
|
||||||
|
|
||||||
|
|
||||||
def _get_health_recommendations(validation: dict[str, Any], format_info: dict[str, Any]) -> list[str]:
|
|
||||||
"""Get health improvement recommendations."""
|
|
||||||
recommendations = []
|
|
||||||
|
|
||||||
if validation["errors"]:
|
|
||||||
recommendations.append("Fix validation errors before processing")
|
|
||||||
|
|
||||||
if validation.get("password_protected"):
|
|
||||||
recommendations.append("Remove password protection if possible")
|
|
||||||
|
|
||||||
if format_info.get("is_legacy"):
|
|
||||||
recommendations.append("Consider converting to modern format (.docx, .xlsx, .pptx)")
|
|
||||||
|
|
||||||
structure = format_info.get("structure", {})
|
|
||||||
if structure.get("estimated_complexity") == "complex":
|
|
||||||
recommendations.append("Complex document may require specialized processing")
|
|
||||||
|
|
||||||
if not recommendations:
|
|
||||||
recommendations.append("Document appears healthy and ready for processing")
|
|
||||||
|
|
||||||
return recommendations
|
|
||||||
|
|
||||||
|
|
||||||
def _smart_truncate_content(content: str, max_chars: int) -> str:
|
|
||||||
"""Intelligently truncate content while preserving structure and readability."""
|
|
||||||
if len(content) <= max_chars:
|
|
||||||
return content
|
|
||||||
|
|
||||||
lines = content.split('\n')
|
|
||||||
truncated_lines = []
|
|
||||||
current_length = 0
|
|
||||||
|
|
||||||
# Try to preserve structure by stopping at a natural break point
|
|
||||||
for line in lines:
|
|
||||||
line_length = len(line) + 1 # +1 for newline
|
|
||||||
|
|
||||||
# If adding this line would exceed limit
|
|
||||||
if current_length + line_length > max_chars:
|
|
||||||
# Try to find a good stopping point
|
|
||||||
if truncated_lines:
|
|
||||||
# Check if we're in the middle of a section
|
|
||||||
last_lines = '\n'.join(truncated_lines[-3:]) if len(truncated_lines) >= 3 else '\n'.join(truncated_lines)
|
|
||||||
|
|
||||||
# If we stopped mid-paragraph, remove incomplete paragraph
|
|
||||||
if not (line.strip() == '' or line.startswith('#') or line.startswith('|')):
|
|
||||||
# Remove lines until we hit a natural break
|
|
||||||
while truncated_lines and not (
|
|
||||||
truncated_lines[-1].strip() == '' or
|
|
||||||
truncated_lines[-1].startswith('#') or
|
|
||||||
truncated_lines[-1].startswith('|') or
|
|
||||||
truncated_lines[-1].startswith('-') or
|
|
||||||
truncated_lines[-1].startswith('*')
|
|
||||||
):
|
|
||||||
truncated_lines.pop()
|
|
||||||
break
|
|
||||||
|
|
||||||
truncated_lines.append(line)
|
|
||||||
current_length += line_length
|
|
||||||
|
|
||||||
# Add truncation notice
|
|
||||||
result = '\n'.join(truncated_lines)
|
|
||||||
result += f"\n\n---\n**[CONTENT TRUNCATED]**\nShowing {len(result):,} of {len(content):,} characters.\nUse smaller page ranges (e.g., 3-5 pages) for full content without truncation.\n---"
|
|
||||||
|
|
||||||
return result
|
|
||||||
|
|
||||||
|
|
||||||
def _parse_page_range(page_range: str) -> list[int]:
|
|
||||||
"""Parse page range string into list of page numbers.
|
|
||||||
|
|
||||||
Examples:
|
|
||||||
"1-5" -> [1, 2, 3, 4, 5]
|
|
||||||
"1,3,5" -> [1, 3, 5]
|
|
||||||
"1-3,5,7-9" -> [1, 2, 3, 5, 7, 8, 9]
|
|
||||||
"""
|
|
||||||
pages = set()
|
|
||||||
|
|
||||||
for part in page_range.split(','):
|
|
||||||
part = part.strip()
|
|
||||||
if '-' in part:
|
|
||||||
# Handle range like "1-5"
|
|
||||||
start, end = part.split('-', 1)
|
|
||||||
try:
|
|
||||||
start_num = int(start.strip())
|
|
||||||
end_num = int(end.strip())
|
|
||||||
pages.update(range(start_num, end_num + 1))
|
|
||||||
except ValueError:
|
|
||||||
continue
|
|
||||||
else:
|
|
||||||
# Handle single page like "3"
|
|
||||||
try:
|
|
||||||
pages.add(int(part))
|
|
||||||
except ValueError:
|
|
||||||
continue
|
|
||||||
|
|
||||||
return sorted(list(pages))
|
|
||||||
|
|
||||||
|
|
||||||
def _get_processing_recommendation(
|
|
||||||
doc_analysis: dict[str, Any],
|
|
||||||
page_range: str,
|
|
||||||
summary_only: bool
|
|
||||||
) -> dict[str, Any]:
|
|
||||||
"""Generate intelligent processing recommendations based on document analysis."""
|
|
||||||
|
|
||||||
estimated_pages = doc_analysis["estimated_pages"]
|
|
||||||
content_size = doc_analysis["estimated_content_size"]
|
|
||||||
|
|
||||||
recommendation = {
|
|
||||||
"status": "optimal",
|
|
||||||
"message": "",
|
|
||||||
"suggested_workflow": [],
|
|
||||||
"warnings": []
|
|
||||||
}
|
|
||||||
|
|
||||||
# Large document recommendations
|
|
||||||
if content_size in ["large", "very_large"] and not page_range and not summary_only:
|
|
||||||
recommendation["status"] = "suboptimal"
|
|
||||||
recommendation["message"] = (
|
|
||||||
f"⚠️ Large document detected ({estimated_pages} estimated pages). "
|
|
||||||
"Consider using recommended workflow for better performance."
|
|
||||||
)
|
|
||||||
recommendation["suggested_workflow"] = [
|
|
||||||
"1. First: Call with summary_only=true to get document overview and TOC",
|
|
||||||
"2. Then: Use page_range to process specific sections (e.g., '1-5', '6-10', '15-20')",
|
|
||||||
"3. Recommended: Use 3-8 page chunks to stay under 25k token MCP limit",
|
|
||||||
"4. The tool auto-truncates if content is too large, but smaller ranges work better"
|
|
||||||
]
|
|
||||||
recommendation["warnings"] = [
|
|
||||||
"Page ranges >8 pages may hit 25k token response limit and get truncated",
|
|
||||||
"Use smaller page ranges (3-5 pages) for dense content documents",
|
|
||||||
"Auto-truncation preserves structure but loses content completeness"
|
|
||||||
]
|
|
||||||
|
|
||||||
# Medium document recommendations
|
|
||||||
elif content_size == "medium" and not page_range and not summary_only:
|
|
||||||
recommendation["status"] = "caution"
|
|
||||||
recommendation["message"] = (
|
|
||||||
f"Medium document detected ({estimated_pages} estimated pages). "
|
|
||||||
"Consider summary_only=true first if you encounter response size issues."
|
|
||||||
)
|
|
||||||
recommendation["suggested_workflow"] = [
|
|
||||||
"Option 1: Try full processing (current approach)",
|
|
||||||
"Option 2: Use summary_only=true first, then page_range if needed"
|
|
||||||
]
|
|
||||||
|
|
||||||
# Optimal usage patterns
|
|
||||||
elif summary_only:
|
|
||||||
recommendation["message"] = "✅ Excellent! Using summary mode for initial document analysis."
|
|
||||||
recommendation["suggested_workflow"] = [
|
|
||||||
"After reviewing summary, use page_range to extract specific sections of interest"
|
|
||||||
]
|
|
||||||
|
|
||||||
elif page_range and content_size in ["large", "very_large"]:
|
|
||||||
recommendation["message"] = "✅ Perfect! Using page-range processing for efficient extraction."
|
|
||||||
|
|
||||||
elif content_size == "small":
|
|
||||||
recommendation["message"] = "✅ Small document - full processing is optimal."
|
|
||||||
|
|
||||||
return recommendation
|
|
||||||
File diff suppressed because it is too large
Load Diff
@ -64,7 +64,7 @@ class TestMixinArchitecture:
|
|||||||
word = WordMixin()
|
word = WordMixin()
|
||||||
word.register_all(app)
|
word.register_all(app)
|
||||||
word_tools = len(app._tool_manager._tools) - initial_tool_count - universal_tools
|
word_tools = len(app._tool_manager._tools) - initial_tool_count - universal_tools
|
||||||
assert word_tools == 10 # convert_to_markdown, extract_word_tables, analyze_word_structure, get_document_outline, check_style_consistency, search_document, extract_entities, get_chapter_summaries, save_reading_progress, get_reading_progress
|
assert word_tools == 3 # convert_to_markdown, extract_word_tables, analyze_word_structure
|
||||||
|
|
||||||
excel = ExcelMixin()
|
excel = ExcelMixin()
|
||||||
excel.register_all(app)
|
excel.register_all(app)
|
||||||
|
|||||||
@ -149,8 +149,8 @@ class TestMixinIntegration:
|
|||||||
# Verify no duplicates
|
# Verify no duplicates
|
||||||
assert len(tool_names) == len(set(tool_names)), "Tool names should be unique"
|
assert len(tool_names) == len(set(tool_names)), "Tool names should be unique"
|
||||||
|
|
||||||
# Verify expected count: 6 universal + 10 word + 3 excel = 19
|
# Verify expected count: 6 universal + 3 word + 3 excel = 12
|
||||||
assert len(tool_names) == 19, f"Expected 19 tools, got {len(tool_names)}: {list(tool_names.keys())}"
|
assert len(tool_names) == 12, f"Expected 12 tools, got {len(tool_names)}: {list(tool_names.keys())}"
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
|
|||||||
@ -28,14 +28,14 @@ class TestWordMixinRegistration:
|
|||||||
mixin.register_all(app)
|
mixin.register_all(app)
|
||||||
|
|
||||||
assert mixin is not None
|
assert mixin is not None
|
||||||
assert len(app._tool_manager._tools) == 10 # convert_to_markdown, extract_word_tables, analyze_word_structure, get_document_outline, check_style_consistency, search_document, extract_entities, get_chapter_summaries, save_reading_progress, get_reading_progress
|
assert len(app._tool_manager._tools) == 3 # convert_to_markdown, extract_word_tables, analyze_word_structure
|
||||||
|
|
||||||
def test_tool_names_registered(self):
|
def test_tool_names_registered(self):
|
||||||
"""Test that Word-specific tools are registered."""
|
"""Test that Word-specific tools are registered."""
|
||||||
app = FastMCP("Test Word")
|
app = FastMCP("Test Word")
|
||||||
WordMixin().register_all(app)
|
WordMixin().register_all(app)
|
||||||
|
|
||||||
expected_tools = {"convert_to_markdown", "extract_word_tables", "analyze_word_structure", "get_document_outline", "check_style_consistency", "search_document", "extract_entities", "get_chapter_summaries", "save_reading_progress", "get_reading_progress"}
|
expected_tools = {"convert_to_markdown", "extract_word_tables", "analyze_word_structure"}
|
||||||
registered_tools = set(app._tool_manager._tools.keys())
|
registered_tools = set(app._tool_manager._tools.keys())
|
||||||
assert expected_tools.issubset(registered_tools)
|
assert expected_tools.issubset(registered_tools)
|
||||||
|
|
||||||
@ -409,85 +409,5 @@ class TestLegacyWordSupport:
|
|||||||
assert "conversion_method" in result["metadata"]
|
assert "conversion_method" in result["metadata"]
|
||||||
|
|
||||||
|
|
||||||
class TestPageRangeFiltering:
|
|
||||||
"""Test page_range content filtering for convert_to_markdown.
|
|
||||||
|
|
||||||
These tests verify that the page_range parameter correctly filters
|
|
||||||
content based on either explicit page breaks or estimated paragraph counts.
|
|
||||||
"""
|
|
||||||
|
|
||||||
@pytest.fixture
|
|
||||||
def mixin(self):
|
|
||||||
"""Create WordMixin for testing."""
|
|
||||||
app = FastMCP("Test")
|
|
||||||
mixin = WordMixin()
|
|
||||||
mixin.register_all(app)
|
|
||||||
return mixin
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
|
||||||
@patch('mcp_office_tools.mixins.word.resolve_office_file_path')
|
|
||||||
@patch('mcp_office_tools.mixins.word.validate_office_file')
|
|
||||||
@patch('mcp_office_tools.mixins.word.detect_format')
|
|
||||||
async def test_page_range_filters_different_content(self, mock_detect, mock_validate, mock_resolve, mixin):
|
|
||||||
"""Test that different page_range values return different content.
|
|
||||||
|
|
||||||
This is the key regression test for the page_range bug where
|
|
||||||
include_current_page was set but never used to filter content.
|
|
||||||
"""
|
|
||||||
mock_resolve.return_value = "/test.docx"
|
|
||||||
mock_validate.return_value = {"is_valid": True, "errors": []}
|
|
||||||
mock_detect.return_value = {"category": "word", "extension": ".docx", "format_name": "Word Document"}
|
|
||||||
|
|
||||||
with patch.object(mixin, '_analyze_document_size') as mock_analyze:
|
|
||||||
with patch.object(mixin, '_get_processing_recommendation') as mock_recommend:
|
|
||||||
mock_analyze.return_value = {"estimated_pages": 10}
|
|
||||||
mock_recommend.return_value = {"status": "optimal", "message": "", "suggested_workflow": [], "warnings": []}
|
|
||||||
|
|
||||||
# Create mock conversions that return different content per page
|
|
||||||
call_count = [0]
|
|
||||||
def mock_convert_side_effect(*args, **kwargs):
|
|
||||||
call_count[0] += 1
|
|
||||||
page_numbers = args[5] if len(args) > 5 else kwargs.get('page_numbers')
|
|
||||||
if page_numbers == [1, 2]:
|
|
||||||
return {
|
|
||||||
"content": "# Page 1-2 Content\n\nThis is from pages 1 and 2.",
|
|
||||||
"method_used": "python-docx-custom",
|
|
||||||
"images": [],
|
|
||||||
"structure": {"headings": [], "tables": 0, "lists": 0, "paragraphs": 5}
|
|
||||||
}
|
|
||||||
elif page_numbers == [10, 11]:
|
|
||||||
return {
|
|
||||||
"content": "# Page 10-11 Content\n\nThis is from pages 10 and 11.",
|
|
||||||
"method_used": "python-docx-custom",
|
|
||||||
"images": [],
|
|
||||||
"structure": {"headings": [], "tables": 0, "lists": 0, "paragraphs": 5}
|
|
||||||
}
|
|
||||||
else:
|
|
||||||
return {
|
|
||||||
"content": "# Full Content",
|
|
||||||
"method_used": "python-docx-custom",
|
|
||||||
"images": [],
|
|
||||||
"structure": {"headings": [], "tables": 0, "lists": 0, "paragraphs": 20}
|
|
||||||
}
|
|
||||||
|
|
||||||
with patch.object(mixin, '_convert_docx_to_markdown', side_effect=mock_convert_side_effect):
|
|
||||||
# Test page_range 1-2
|
|
||||||
result_1_2 = await mixin.convert_to_markdown(
|
|
||||||
file_path="/test.docx",
|
|
||||||
page_range="1-2"
|
|
||||||
)
|
|
||||||
|
|
||||||
# Test page_range 10-11
|
|
||||||
result_10_11 = await mixin.convert_to_markdown(
|
|
||||||
file_path="/test.docx",
|
|
||||||
page_range="10-11"
|
|
||||||
)
|
|
||||||
|
|
||||||
# The content should be different for different page ranges
|
|
||||||
assert "Page 1-2" in result_1_2["markdown"]
|
|
||||||
assert "Page 10-11" in result_10_11["markdown"]
|
|
||||||
assert result_1_2["markdown"] != result_10_11["markdown"]
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
pytest.main([__file__, "-v"])
|
pytest.main([__file__, "-v"])
|
||||||
Loading…
x
Reference in New Issue
Block a user